From a1015c7dcd3fb21c8fadc3e6ff43783072657af2 Mon Sep 17 00:00:00 2001 From: Michael Clifford Date: Fri, 18 Oct 2024 12:54:57 -0400 Subject: [PATCH] use knowledge data for phase 1 training and skills data for phase 2 Signed-off-by: Michael Clifford --- pipeline.py | 28 +++++++++--- pipeline.yaml | 90 ++++++++++++++++++++++++++++++++------- standalone/standalone.py | 85 ++++++++++++++++++++++++++++-------- standalone/standalone.tpl | 19 ++++++--- training/components.py | 32 +++++++++++--- 5 files changed, 203 insertions(+), 51 deletions(-) diff --git a/pipeline.py b/pipeline.py index 889b48f..3a5a9ac 100644 --- a/pipeline.py +++ b/pipeline.py @@ -146,12 +146,26 @@ def pipeline( size="1Gi", storage_class_name=storage_class_name, ) - sdg_to_pvc_task = artifact_to_pvc_op( - data=data_processing_task.outputs["processed_data"], pvc_path="/data" + + sdg_skills_to_pvc_task = artifact_to_pvc_op( + data=data_processing_task.outputs["skills_processed_data"], pvc_path="/data" ) - sdg_to_pvc_task.set_caching_options(False) + sdg_skills_to_pvc_task.set_caching_options(False) mount_pvc( - task=sdg_to_pvc_task, pvc_name=sdg_input_pvc_task.output, mount_path="/data" + task=sdg_skills_to_pvc_task, + pvc_name=sdg_input_pvc_task.output, + mount_path="/data", + ) + + sdg_knowledge_to_pvc_task = artifact_to_pvc_op( + data=data_processing_task.outputs["knowledge_processed_data"], + pvc_path="/data", + ) + sdg_knowledge_to_pvc_task.set_caching_options(False) + mount_pvc( + task=sdg_knowledge_to_pvc_task, + pvc_name=sdg_input_pvc_task.output, + mount_path="/data", ) output_pvc_task = CreatePVC( @@ -177,7 +191,7 @@ def pipeline( kubectl_apply_task = kubectl_apply_op( manifest=pytorchjob_manifest_task.outputs["manifest"] ) - kubectl_apply_task.after(sdg_to_pvc_task, model_to_pvc_task) + kubectl_apply_task.after(sdg_knowledge_to_pvc_task, model_to_pvc_task) kubectl_apply_task.set_caching_options(False) kubectl_wait_task = kubectl_wait_for_op( @@ -255,7 +269,7 @@ def pipeline( kubectl_apply_2_task = kubectl_apply_op( manifest=pytorchjob_manifest_2_task.outputs["manifest"] ) - kubectl_apply_2_task.after(sdg_to_pvc_task, model_to_pvc_task) + kubectl_apply_2_task.after(sdg_knowledge_to_pvc_task, model_to_pvc_task) kubectl_apply_2_task.set_caching_options(False) kubectl_wait_2_task = kubectl_wait_for_op( @@ -445,7 +459,7 @@ def gen_standalone(): # The list of executor names to extract details from to generate the standalone script executors = { - "exec-data-processing-op": 'data_processing_op(max_seq_len={MAX_SEQ_LEN}, max_batch_len={MAX_BATCH_LEN}, sdg="{DATA_PVC_SDG_PATH}", model="{DATA_PVC_MODEL_PATH}", processed_data="{PREPROCESSED_DATA_PATH}")', + "exec-data-processing-op": 'data_processing_op(max_seq_len={MAX_SEQ_LEN}, max_batch_len={MAX_BATCH_LEN}, sdg="{DATA_PVC_SDG_PATH}", model="{DATA_PVC_MODEL_PATH}", skills_processed_data="{PREPROCESSED_DATA_PATH_SKILLS}", knowledge_processed_data="{PREPROCESSED_DATA_PATH_KNOWLEDGE}")', "exec-sdg-op": 'sdg_op(num_instructions_to_generate={num_instructions_to_generate}, repo_branch="{exec_git_clone_op_repo_branch}", repo_pr={exec_git_clone_op_repo_pr}, taxonomy="{TAXONOMY_DATA_PATH}", sdg="{SDG_GENERATED_DATA_PATH}")', "exec-git-clone-op": {}, "exec-huggingface-importer-op": 'huggingface_importer_op(repo_name="{REPO_GRANITE_7B_IMAGE}", model="{DATA_PVC_MODEL_PATH}")', diff --git a/pipeline.yaml b/pipeline.yaml index 1155e31..eceb862 100644 --- a/pipeline.yaml +++ b/pipeline.yaml @@ -39,6 +39,17 @@ components: parameters: pvc_path: parameterType: STRING + comp-artifact-to-pvc-op-3: + executorLabel: exec-artifact-to-pvc-op-3 + inputDefinitions: + artifacts: + data: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + parameters: + pvc_path: + parameterType: STRING comp-createpvc: executorLabel: exec-createpvc inputDefinitions: @@ -245,7 +256,11 @@ components: parameterType: NUMBER_INTEGER outputDefinitions: artifacts: - processed_data: + knowledge_processed_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + skills_processed_data: artifactType: schemaTitle: system.Dataset schemaVersion: 0.0.1 @@ -539,6 +554,14 @@ deploymentSpec: - /bin/sh - -c image: registry.access.redhat.com/ubi9/toolbox + exec-artifact-to-pvc-op-3: + container: + args: + - cp -r {{$.inputs.artifacts['data'].path}} {{$.inputs.parameters['pvc_path']}} + command: + - /bin/sh + - -c + image: registry.access.redhat.com/ubi9/toolbox exec-createpvc: container: image: argostub/createpvc @@ -575,16 +598,26 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef data_processing_op(\n sdg: dsl.Input[dsl.Dataset],\n processed_data:\ - \ dsl.Output[dsl.Dataset],\n model: dsl.Input[dsl.Artifact],\n max_seq_len:\ - \ Optional[int] = 4096,\n max_batch_len: Optional[int] = 20000,\n):\n\ - \ import os\n\n import instructlab.training.data_process as dp\n \ - \ from instructlab.training import (\n DataProcessArgs,\n \ - \ TrainingArgs,\n )\n\n # define training-specific arguments\n \ - \ training_args = TrainingArgs(\n # define data-specific arguments\n\ - \ model_path=model.path,\n data_path=f\"{sdg.path}/*_train_msgs*.jsonl\"\ - ,\n data_output_dir=processed_data.path,\n # define model-trianing\ - \ parameters\n max_seq_len=max_seq_len,\n max_batch_len=max_batch_len,\n\ + \ *\n\ndef data_processing_op(\n sdg: dsl.Input[dsl.Dataset],\n skills_processed_data:\ + \ dsl.Output[dsl.Dataset],\n knowledge_processed_data: dsl.Output[dsl.Dataset],\n\ + \ model: dsl.Input[dsl.Artifact],\n max_seq_len: Optional[int] = 4096,\n\ + \ max_batch_len: Optional[int] = 20000,\n):\n import os\n\n import\ + \ instructlab.training.data_process as dp\n from instructlab.training\ + \ import (\n DataProcessArgs,\n TrainingArgs,\n )\n\n \ + \ # define training-specific arguments\n skill_training_args = TrainingArgs(\n\ + \ # define data-specific arguments\n model_path=model.path,\n\ + \ data_path=f\"{sdg.path}/skills_train_msgs*.jsonl\",\n data_output_dir=skills_processed_data.path,\n\ + \ # define model-trianing parameters\n max_seq_len=max_seq_len,\n\ + \ max_batch_len=max_batch_len,\n # XXX(shanand): We don't\ + \ need the following arguments\n # for data processing. Added them\ + \ for now to avoid\n # Pydantic validation errors for TrainingArgs\n\ + \ ckpt_output_dir=\"data/saved_checkpoints\",\n num_epochs=2,\n\ + \ effective_batch_size=3840,\n save_samples=0,\n learning_rate=2e-6,\n\ + \ warmup_steps=800,\n is_padding_free=True,\n )\n\n \ + \ knowledge_training_args = TrainingArgs(\n # define data-specific\ + \ arguments\n model_path=model.path,\n data_path=f\"{sdg.path}/knowledge_train_msgs*.jsonl\"\ + ,\n data_output_dir=knowledge_processed_data.path,\n # define\ + \ model-trianing parameters\n max_seq_len=max_seq_len,\n max_batch_len=max_batch_len,\n\ \ # XXX(shanand): We don't need the following arguments\n \ \ # for data processing. Added them for now to avoid\n # Pydantic\ \ validation errors for TrainingArgs\n ckpt_output_dir=\"data/saved_checkpoints\"\ @@ -607,8 +640,8 @@ deploymentSpec: \ data_output_path=train_args.data_output_dir,\n \ \ model_path=train_args.model_path,\n data_path=train_args.data_path,\n\ \ max_seq_len=train_args.max_seq_len,\n chat_tmpl_path=train_args.chat_tmpl_path,\n\ - \ )\n )\n\n data_processing(train_args=training_args)\n\ - \n" + \ )\n )\n\n data_processing(train_args=skill_training_args)\n\ + \ data_processing(train_args=knowledge_training_args)\n\n" image: registry.access.redhat.com/ubi9/python-311:latest exec-deletepvc: container: @@ -1450,7 +1483,7 @@ root: artifacts: data: taskOutputArtifact: - outputArtifactKey: processed_data + outputArtifactKey: skills_processed_data producerTask: data-processing-op parameters: pvc_path: @@ -1458,6 +1491,25 @@ root: constant: /data taskInfo: name: artifact-to-pvc-op-2 + artifact-to-pvc-op-3: + cachingOptions: {} + componentRef: + name: comp-artifact-to-pvc-op-3 + dependentTasks: + - createpvc-2 + - data-processing-op + inputs: + artifacts: + data: + taskOutputArtifact: + outputArtifactKey: knowledge_processed_data + producerTask: data-processing-op + parameters: + pvc_path: + runtimeValue: + constant: /data + taskInfo: + name: artifact-to-pvc-op-3 createpvc: cachingOptions: enableCache: true @@ -1624,7 +1676,7 @@ root: name: comp-kubectl-apply-op dependentTasks: - artifact-to-pvc-op - - artifact-to-pvc-op-2 + - artifact-to-pvc-op-3 - pytorchjob-manifest-op inputs: parameters: @@ -1640,7 +1692,7 @@ root: name: comp-kubectl-apply-op-2 dependentTasks: - artifact-to-pvc-op - - artifact-to-pvc-op-2 + - artifact-to-pvc-op-3 - pytorchjob-manifest-op-2 inputs: parameters: @@ -1972,6 +2024,12 @@ platforms: taskOutputParameter: outputParameterKey: name producerTask: createpvc-2 + exec-artifact-to-pvc-op-3: + pvcMount: + - mountPath: /data + taskOutputParameter: + outputParameterKey: name + producerTask: createpvc-2 exec-list-models-in-directory-op: pvcMount: - mountPath: /output/model diff --git a/standalone/standalone.py b/standalone/standalone.py index 0384d17..eb9aba4 100755 --- a/standalone/standalone.py +++ b/standalone/standalone.py @@ -71,6 +71,8 @@ DATA_PVC_OUTPUT_PATH = path.join(DATA_PVC_MOUNT_PATH, "output") DATA_PVC_OUTPUT_DATA_PATH = path.join(DATA_PVC_OUTPUT_PATH, "data") PREPROCESSED_DATA_PATH = path.join(DATA_PVC_SDG_PATH, "processed_data") +PREPROCESSED_DATA_SKILLS_PATH = path.join(PREPROCESSED_DATA_PATH, "skills") +PREPROCESSED_DATA_KNOWLEDGE_PATH = path.join(PREPROCESSED_DATA_PATH, "knowledge") MT_BENCH_OUTPUT_PATH = path.join(DATA_PVC_MOUNT_PATH, "mt-bench-results.txt") MT_BENCH_SCORES_PATH = path.join(DATA_PVC_MOUNT_PATH, "mt-bench-best.txt") MT_BENCH_BRANCH_SCORES_PATH = path.join(DATA_PVC_MOUNT_PATH, "mt-bench-branch-best.txt") @@ -117,11 +119,13 @@ - args: - | phase_num={phase_num} - processed_data_path={preprocessed_data_path} + PATH_TO_DATA={preprocessed_data_knowledge_path} + if [ "$phase_num" -eq 2 ]; then PATH_TO_DATA="{preprocessed_data_skills_path}"; fi echo "Running phase $phase_num" PATH_TO_MODEL={path_to_model} if [ "$phase_num" -eq 2 ]; then PATH_TO_MODEL="{path_to_model}/output/phase_1/hf_format/$(ls --sort=time {path_to_model}/output/phase_1/hf_format|head -n 1)"; fi echo "Using $PATH_TO_MODEL model for training" + echo "Using $PATH_TO_DATA data for training" mkdir -p {data_pvc_model_path}; mkdir -p {data_pvc_sdg_path}; mkdir -p {path_to_model}/output/phase_{phase_num} @@ -131,7 +135,7 @@ --rdzv_endpoint $(MASTER_ADDR):$(MASTER_PORT) \ -m instructlab.training.main_ds \ --model_name_or_path="$PATH_TO_MODEL" \ - --data_path="$processed_data_path"/data.jsonl \ + --data_path="$PATH_TO_DATA"/data.jsonl \ --output_dir={path_to_model}/output/phase_{phase_num} \ --num_epochs={epoch_num} \ --effective_batch_size=3840 \ @@ -191,11 +195,13 @@ - args: - | phase_num={phase_num} - processed_data_path={preprocessed_data_path} + PATH_TO_DATA={preprocessed_data_knowledge_path} + if [ "$phase_num" -eq 2 ]; then PATH_TO_DATA="{preprocessed_data_skills_path}"; fi echo "Running phase $phase_num" PATH_TO_MODEL={path_to_model} if [ "$phase_num" -eq 2 ]; then PATH_TO_MODEL="{path_to_model}/output/phase_1/hf_format/$(ls --sort=time {path_to_model}/output/phase_1/hf_format|head -n 1)"; fi echo "Using $PATH_TO_MODEL model for training" + echo "Using $PATH_TO_DATA data for training" tmp_model=$(mktemp -d) mkdir -p "$tmp_model"; torchrun --nnodes {nnodes} \ @@ -204,7 +210,7 @@ --rdzv_endpoint $(MASTER_ADDR):$(MASTER_PORT) \ -m instructlab.training.main_ds \ --model_name_or_path="$PATH_TO_MODEL" \ - --data_path="$processed_data_path"/data.jsonl \ + --data_path="$PATH_TO_DATA"/data.jsonl \ --output_dir="$tmp_model" \ --num_epochs={epoch_num} \ --effective_batch_size=3840 \ @@ -1071,7 +1077,8 @@ def huggingface_importer_op(model: str, repo_name: str): def data_processing_op( sdg: str, - processed_data: str, + skills_processed_data: str, + knowledge_processed_data: str, model: str, max_seq_len: Optional[int] = 4096, max_batch_len: Optional[int] = 20000, @@ -1085,11 +1092,31 @@ def data_processing_op( ) # define training-specific arguments - training_args = TrainingArgs( + skill_training_args = TrainingArgs( # define data-specific arguments model_path=model, - data_path=f"{sdg}/*_train_msgs*.jsonl", - data_output_dir=processed_data, + data_path=f"{sdg}/skills_train_msgs*.jsonl", + data_output_dir=skills_processed_data, + # define model-trianing parameters + max_seq_len=max_seq_len, + max_batch_len=max_batch_len, + # XXX(shanand): We don't need the following arguments + # for data processing. Added them for now to avoid + # Pydantic validation errors for TrainingArgs + ckpt_output_dir="data/saved_checkpoints", + num_epochs=2, + effective_batch_size=3840, + save_samples=0, + learning_rate=2e-6, + warmup_steps=800, + is_padding_free=True, + ) + + knowledge_training_args = TrainingArgs( + # define data-specific arguments + model_path=model, + data_path=f"{sdg}/knowledge_train_msgs*.jsonl", + data_output_dir=knowledge_processed_data, # define model-trianing parameters max_seq_len=max_seq_len, max_batch_len=max_batch_len, @@ -1131,10 +1158,11 @@ def data_processing(train_args: TrainingArgs) -> None: ) ) - data_processing(train_args=training_args) + data_processing(train_args=skill_training_args) + data_processing(train_args=knowledge_training_args) """ exec_data_processing_op_args = f""" -data_processing_op(max_seq_len={MAX_SEQ_LEN}, max_batch_len={MAX_BATCH_LEN}, sdg="{DATA_PVC_SDG_PATH}", model="{DATA_PVC_MODEL_PATH}", processed_data="{PREPROCESSED_DATA_PATH}") +data_processing_op(max_seq_len={MAX_SEQ_LEN}, max_batch_len={MAX_BATCH_LEN}, sdg="{DATA_PVC_SDG_PATH}", model="{DATA_PVC_MODEL_PATH}", skills_processed_data="{PREPROCESSED_DATA_PATH_SKILLS}", knowledge_processed_data="{PREPROCESSED_DATA_PATH_KNOWLEDGE}") """ exec_git_clone_op_args = literal_eval(""" ['git clone {exec_git_clone_op_repo_url} {TAXONOMY_PATH} && cd {TAXONOMY_PATH} && if [ -n "{exec_git_clone_op_repo_branch}" ]; then git fetch origin {exec_git_clone_op_repo_branch} && git checkout {exec_git_clone_op_repo_branch}; elif [ -n "{exec_git_clone_op_repo_pr}" ] && [ {exec_git_clone_op_repo_pr} -gt 0 ]; then git fetch origin pull/{exec_git_clone_op_repo_pr}/head:{exec_git_clone_op_repo_pr} && git checkout {exec_git_clone_op_repo_pr}; fi '] @@ -1289,7 +1317,8 @@ def create_data_job( def data_processing_op( sdg: str, - processed_data: str, + skills_processed_data: str, + knowledge_processed_data: str, model: str, max_seq_len: Optional[int] = 4096, max_batch_len: Optional[int] = 20000, @@ -1303,11 +1332,31 @@ def data_processing_op( ) # define training-specific arguments - training_args = TrainingArgs( + skill_training_args = TrainingArgs( + # define data-specific arguments + model_path=model, + data_path=f"{sdg}/skills_train_msgs*.jsonl", + data_output_dir=skills_processed_data, + # define model-trianing parameters + max_seq_len=max_seq_len, + max_batch_len=max_batch_len, + # XXX(shanand): We don't need the following arguments + # for data processing. Added them for now to avoid + # Pydantic validation errors for TrainingArgs + ckpt_output_dir="data/saved_checkpoints", + num_epochs=2, + effective_batch_size=3840, + save_samples=0, + learning_rate=2e-6, + warmup_steps=800, + is_padding_free=True, + ) + + knowledge_training_args = TrainingArgs( # define data-specific arguments model_path=model, - data_path=f"{sdg}/*_train_msgs*.jsonl", - data_output_dir=processed_data, + data_path=f"{sdg}/knowledge_train_msgs*.jsonl", + data_output_dir=knowledge_processed_data, # define model-trianing parameters max_seq_len=max_seq_len, max_batch_len=max_batch_len, @@ -1349,10 +1398,11 @@ def data_processing(train_args: TrainingArgs) -> None: ) ) - data_processing(train_args=training_args) + data_processing(train_args=skill_training_args) + data_processing(train_args=knowledge_training_args) """ exec_data_processing_op_args = f""" -data_processing_op(max_seq_len={MAX_SEQ_LEN}, max_batch_len={MAX_BATCH_LEN}, sdg="{DATA_PVC_SDG_PATH}", model="{DATA_PVC_MODEL_PATH}", processed_data="{PREPROCESSED_DATA_PATH}") +data_processing_op(max_seq_len={MAX_SEQ_LEN}, max_batch_len={MAX_BATCH_LEN}, sdg="{DATA_PVC_SDG_PATH}", model="{DATA_PVC_MODEL_PATH}", skills_processed_data="{PREPROCESSED_DATA_SKILLS_PATH}", knowledge_processed_data="{PREPROCESSED_DATA_KNOWLEDGE_PATH}") """ data_container = kubernetes.client.V1Container( @@ -2888,7 +2938,8 @@ def train( phase_num=training_phase, data_pvc_model_path=DATA_PVC_MODEL_PATH, data_pvc_sdg_path=DATA_PVC_SDG_PATH, - preprocessed_data_path=PREPROCESSED_DATA_PATH, + preprocessed_data_skills_path=PREPROCESSED_DATA_SKILLS_PATH, + preprocessed_data_knowledge_path=PREPROCESSED_DATA_KNOWLEDGE_PATH, ) ) diff --git a/standalone/standalone.tpl b/standalone/standalone.tpl index cb7ad53..04951fe 100755 --- a/standalone/standalone.tpl +++ b/standalone/standalone.tpl @@ -71,6 +71,8 @@ TAXONOMY_PATH = path.join(DATA_PVC_MOUNT_PATH, "taxonomy") DATA_PVC_OUTPUT_PATH = path.join(DATA_PVC_MOUNT_PATH, "output") DATA_PVC_OUTPUT_DATA_PATH = path.join(DATA_PVC_OUTPUT_PATH, "data") PREPROCESSED_DATA_PATH = path.join(DATA_PVC_SDG_PATH, "processed_data") +PREPROCESSED_DATA_SKILLS_PATH = path.join(PREPROCESSED_DATA_PATH, "skills") +PREPROCESSED_DATA_KNOWLEDGE_PATH = path.join(PREPROCESSED_DATA_PATH, "knowledge") MT_BENCH_OUTPUT_PATH = path.join(DATA_PVC_MOUNT_PATH, "mt-bench-results.txt") MT_BENCH_SCORES_PATH = path.join(DATA_PVC_MOUNT_PATH, "mt-bench-best.txt") MT_BENCH_BRANCH_SCORES_PATH = path.join(DATA_PVC_MOUNT_PATH, "mt-bench-branch-best.txt") @@ -117,11 +119,13 @@ spec: - args: - | phase_num={phase_num} - processed_data_path={preprocessed_data_path} + PATH_TO_DATA={preprocessed_data_knowledge_path} + if [ "$phase_num" -eq 2 ]; then PATH_TO_DATA="{preprocessed_data_skills_path}"; fi echo "Running phase $phase_num" PATH_TO_MODEL={path_to_model} if [ "$phase_num" -eq 2 ]; then PATH_TO_MODEL="{path_to_model}/output/phase_1/hf_format/$(ls --sort=time {path_to_model}/output/phase_1/hf_format|head -n 1)"; fi echo "Using $PATH_TO_MODEL model for training" + echo "Using $PATH_TO_DATA data for training" mkdir -p {data_pvc_model_path}; mkdir -p {data_pvc_sdg_path}; mkdir -p {path_to_model}/output/phase_{phase_num} @@ -131,7 +135,7 @@ spec: --rdzv_endpoint $(MASTER_ADDR):$(MASTER_PORT) \ -m instructlab.training.main_ds \ --model_name_or_path="$PATH_TO_MODEL" \ - --data_path="$processed_data_path"/data.jsonl \ + --data_path="$PATH_TO_DATA"/data.jsonl \ --output_dir={path_to_model}/output/phase_{phase_num} \ --num_epochs={epoch_num} \ --effective_batch_size=3840 \ @@ -191,11 +195,13 @@ spec: - args: - | phase_num={phase_num} - processed_data_path={preprocessed_data_path} + PATH_TO_DATA={preprocessed_data_knowledge_path} + if [ "$phase_num" -eq 2 ]; then PATH_TO_DATA="{preprocessed_data_skills_path}"; fi echo "Running phase $phase_num" PATH_TO_MODEL={path_to_model} if [ "$phase_num" -eq 2 ]; then PATH_TO_MODEL="{path_to_model}/output/phase_1/hf_format/$(ls --sort=time {path_to_model}/output/phase_1/hf_format|head -n 1)"; fi echo "Using $PATH_TO_MODEL model for training" + echo "Using $PATH_TO_DATA data for training" tmp_model=$(mktemp -d) mkdir -p "$tmp_model"; torchrun --nnodes {nnodes} \ @@ -204,7 +210,7 @@ spec: --rdzv_endpoint $(MASTER_ADDR):$(MASTER_PORT) \ -m instructlab.training.main_ds \ --model_name_or_path="$PATH_TO_MODEL" \ - --data_path="$processed_data_path"/data.jsonl \ + --data_path="$PATH_TO_DATA"/data.jsonl \ --output_dir="$tmp_model" \ --num_epochs={epoch_num} \ --effective_batch_size=3840 \ @@ -1181,7 +1187,7 @@ def create_data_job( {{exec_data_processing_op_command}} """ exec_data_processing_op_args = f""" -data_processing_op(max_seq_len={MAX_SEQ_LEN}, max_batch_len={MAX_BATCH_LEN}, sdg="{DATA_PVC_SDG_PATH}", model="{DATA_PVC_MODEL_PATH}", processed_data="{PREPROCESSED_DATA_PATH}") +data_processing_op(max_seq_len={MAX_SEQ_LEN}, max_batch_len={MAX_BATCH_LEN}, sdg="{DATA_PVC_SDG_PATH}", model="{DATA_PVC_MODEL_PATH}", skills_processed_data="{PREPROCESSED_DATA_SKILLS_PATH}", knowledge_processed_data="{PREPROCESSED_DATA_KNOWLEDGE_PATH}") """ data_container = kubernetes.client.V1Container( @@ -2071,7 +2077,8 @@ def train( phase_num=training_phase, data_pvc_model_path=DATA_PVC_MODEL_PATH, data_pvc_sdg_path=DATA_PVC_SDG_PATH, - preprocessed_data_path=PREPROCESSED_DATA_PATH, + preprocessed_data_skills_path=PREPROCESSED_DATA_SKILLS_PATH, + preprocessed_data_knowledge_path=PREPROCESSED_DATA_KNOWLEDGE_PATH, ) ) diff --git a/training/components.py b/training/components.py index 79d7142..e8960a9 100644 --- a/training/components.py +++ b/training/components.py @@ -16,7 +16,8 @@ ) def data_processing_op( sdg: dsl.Input[dsl.Dataset], - processed_data: dsl.Output[dsl.Dataset], + skills_processed_data: dsl.Output[dsl.Dataset], + knowledge_processed_data: dsl.Output[dsl.Dataset], model: dsl.Input[dsl.Artifact], max_seq_len: Optional[int] = 4096, max_batch_len: Optional[int] = 20000, @@ -30,11 +31,31 @@ def data_processing_op( ) # define training-specific arguments - training_args = TrainingArgs( + skill_training_args = TrainingArgs( # define data-specific arguments model_path=model.path, - data_path=f"{sdg.path}/*_train_msgs*.jsonl", - data_output_dir=processed_data.path, + data_path=f"{sdg.path}/skills_train_msgs*.jsonl", + data_output_dir=skills_processed_data.path, + # define model-trianing parameters + max_seq_len=max_seq_len, + max_batch_len=max_batch_len, + # XXX(shanand): We don't need the following arguments + # for data processing. Added them for now to avoid + # Pydantic validation errors for TrainingArgs + ckpt_output_dir="data/saved_checkpoints", + num_epochs=2, + effective_batch_size=3840, + save_samples=0, + learning_rate=2e-6, + warmup_steps=800, + is_padding_free=True, + ) + + knowledge_training_args = TrainingArgs( + # define data-specific arguments + model_path=model.path, + data_path=f"{sdg.path}/knowledge_train_msgs*.jsonl", + data_output_dir=knowledge_processed_data.path, # define model-trianing parameters max_seq_len=max_seq_len, max_batch_len=max_batch_len, @@ -76,7 +97,8 @@ def data_processing(train_args: TrainingArgs) -> None: ) ) - data_processing(train_args=training_args) + data_processing(train_args=skill_training_args) + data_processing(train_args=knowledge_training_args) @dsl.component(base_image=PYTHON_IMAGE)