diff --git a/nlp/llm/deepseek_moe_7b/colossalai/README.md b/nlp/llm/deepseek_moe_7b/colossalai/README.md index 4a02e88a184721793ac96959fd24402e4da489cf..0c08cfea36f55a4b76b0036bc20813c8c2f5b170 100644 --- a/nlp/llm/deepseek_moe_7b/colossalai/README.md +++ b/nlp/llm/deepseek_moe_7b/colossalai/README.md @@ -10,15 +10,29 @@ DeepSeekMoE 16B is a Mixture-of-Experts (MoE) language model with 16.4B paramete Firstly, you should ensure that ColossalAI is installed in the environment. Generally, ColossalAI is installed by default. +```sh +git clone -b v0.4.4 https://github.com/hpcaitech/ColossalAI.git --depth=1 +cd ColossalAI +cp -rf /toolbox/ColossalAI/v0.4.4/patches/* ./ +pip3 install . +``` + ## Step 2: Prepare model and config -Get "deepseek-moe-16b-base" models and config file from huggingface or other place, and mv it to "/home/model_zoos/nlp/deepseek-moe-16b-base". +Get "deepseek-moe-16b-base" models and config file from huggingface or other place, and mv it to "ColossalAI/examples/language/deepseek/deepseek-ai/deepseek-moe-16b-base". One recommended link: "". +```bash +cd ColossalAI/examples/language/deepseek +mkdir -p deepseek-ai +mv /deepseek-moe-16b-base deepseek-ai/ +``` + ## Step 3: Training ```bash -bash deepseek_moe_7b_pretrain.sh +cd ColossalAI/examples/language/deepseek +colossalai run --nproc_per_node 16 benchmark.py -c 7b -g -b 16 --tp 1 --pp 4 --num_steps 50 ``` ## Results diff --git a/nlp/llm/deepseek_moe_7b/colossalai/benchmark.py b/nlp/llm/deepseek_moe_7b/colossalai/benchmark.py deleted file mode 100644 index a7e43ccb424e3821405273cf971bbc0cd519a265..0000000000000000000000000000000000000000 --- a/nlp/llm/deepseek_moe_7b/colossalai/benchmark.py +++ /dev/null @@ -1,275 +0,0 @@ -# Copyright (c) 2024, Shanghai Iluvatar CoreX Semiconductor Co., Ltd. -# All Rights Reserved. -# modified from mixtral benchmark -import argparse -import resource -import time -import warnings -from contextlib import nullcontext - -import torch -import torch.distributed as dist -from data_utils import RandomDataset -from model_utils import format_numel_str, get_model_numel -from performance_evaluator import PerformanceEvaluator, get_profile_context -from tqdm import tqdm -from transformers import AutoConfig, AutoModelForCausalLM - -import colossalai -from colossalai.accelerator import get_accelerator -from colossalai.booster import Booster -from colossalai.booster.plugin import MoeHybridParallelPlugin -from colossalai.cluster import DistCoordinator -from colossalai.lazy import LazyInitContext -from colossalai.nn.optimizer import HybridAdam -from colossalai.shardformer import PipelineGradientCheckpointConfig - -warnings.filterwarnings("ignore") -# ============================== -# Constants -# ============================== - -# We have lots of llamas for your choice! -# deepseek-ai/deepseek-moe-16b-base model_path -MODEL_CONFIGS = { - "100m": lambda model_path: AutoConfig.from_pretrained( - model_path, - max_position_embeddings=4096, - num_hidden_layers=1, - num_attention_heads=32, - intermediate_size=512, - moe_intermediate_size=128, - hidden_size=512, - n_routed_experts=8, - n_shared_experts=4, - num_experts_per_tok=2, - first_k_dense_replace=0, - attn_implementation="flash_attention_2", - trust_remote_code=True, - ), - "7b": lambda model_path: AutoConfig.from_pretrained( - model_path, - max_position_embeddings=4096, - num_hidden_layers=13, - attn_implementation="flash_attention_2", - trust_remote_code=True, - ), - "14b": lambda model_path: AutoConfig.from_pretrained( - model_path, - max_position_embeddings=4096, - num_hidden_layers=26, - attn_implementation="flash_attention_2", - trust_remote_code=True, - ), -} - - -def main(): - # ============================== - # Parse Arguments - # ============================== - parser = argparse.ArgumentParser() - parser.add_argument("-c", "--config", type=str, default="100m", help="Model configuration") - parser.add_argument( - "-p", - "--plugin", - choices=["3d"], - default="3d", - help="Choose which plugin to use", - ) - parser.add_argument("-b", "--batch_size", type=int, default=1, help="Batch size") - parser.add_argument("-s", "--num_steps", type=int, default=5, help="Number of steps to run") - parser.add_argument("-i", "--ignore_steps", type=int, default=2, help="Number of steps to ignore") - parser.add_argument("-g", "--grad_checkpoint", action="store_true", help="Use gradient checkpointing") - parser.add_argument("-l", "--max_length", type=int, default=4096, help="Max sequence length") - parser.add_argument( - "-w", "--warmup_ratio", type=float, default=0.8, help="warm up ratio of non-model data. Only for gemini-auto" - ) - parser.add_argument("-m", "--memory_limit", type=int, help="Gemini memory limit in mb") - parser.add_argument("-x", "--xformers", action="store_true", help="Use xformers") - parser.add_argument("--shard_param_frac", type=float, default=1.0, help="Shard param fraction. Only for gemini") - parser.add_argument("--offload_optim_frac", type=float, default=0.0, help="Offload optim fraction. Only for gemini") - parser.add_argument("--offload_param_frac", type=float, default=0.0, help="Offload param fraction. Only for gemini") - parser.add_argument("--tp", type=int, default=1, help="Tensor parallel size") - parser.add_argument("--ep", type=int, default=1, help="Expert parallel size") - parser.add_argument("--sp", type=int, default=1, help="Sequence parallel size") - parser.add_argument("--extra_dp", type=int, default=1, help="Extra data parallel size, used for Gemini") - parser.add_argument("--pp", type=int, default=1, help="Pipeline parallel size") - parser.add_argument("--mbs", type=int, default=1, help="Micro batch size of pipeline parallel") - parser.add_argument("--zero", type=int, default=1, help="Zero Stage when hybrid plugin is enabled") - parser.add_argument("--custom-ckpt", action="store_true", help="Customize checkpoint", default=False) - - parser.add_argument("--pp_style", default="1f1b", choices=["1f1b", "interleaved"]) - parser.add_argument("--n_chunks", default=1, help="number of model chunks", type=eval) - parser.add_argument("--profile", action="store_true", help="Profile the code") - parser.add_argument( - "--nsys", - action="store_true", - help="Use nsys for profiling. \ - You should put something like this before colossalai launch: \ - nsys profile -w true -t cuda,cudnn,cublas -s cpu --capture-range=cudaProfilerApi --capture-range-end=stop --cudabacktrace=true -x true --python-backtrace=cuda -o prof_out", - ) - parser.add_argument("--disable-async-reduce", action="store_true", help="Disable the asynchronous reduce operation") - parser.add_argument("--prefetch_num", type=int, default=0, help="chunk prefetch max number") - parser.add_argument("--no_cache", action="store_true") - parser.add_argument("--use_fp8_comm", action="store_true", default=False, help="for using fp8 during communication") - parser.add_argument("--use_fp8", action="store_true", default=False, help="for using fp8 linear") - parser.add_argument("--overlap_allgather", action="store_true") - parser.add_argument( - "--sp_mode", - default="all_to_all", - choices=["all_to_all"], - help="Sequence parallelism mode", - ) - parser.add_argument("--debug", action="store_true", help="Enable debug mode") - parser.add_argument("--model_path", type=str, default=None, help="the path of model and config") - args = parser.parse_args() - - colossalai.launch_from_torch() - coordinator = DistCoordinator() - coordinator.print_on_master(f"args:{args}") - # ckpt config for LLaMA3-70B on 64 H100 GPUs - hybrid_kwargs = ( - { - "gradient_checkpoint_config": PipelineGradientCheckpointConfig( - num_ckpt_layers_per_stage=[19, 19, 19, 13], - ), - "num_layers_per_stage": [19, 20, 20, 21], - "pp_style": "interleaved", - } - if args.custom_ckpt - else {} - ) - - # ============================== - # Initialize Booster - # ============================== - if args.plugin == "3d": - plugin = MoeHybridParallelPlugin( - ep_size=args.ep, - tp_size=args.tp, - pp_size=args.pp, - pp_style=args.pp_style, - num_model_chunks=args.n_chunks, - zero_stage=args.zero, - sp_size=args.sp, - sequence_parallelism_mode=args.sp_mode, - enable_sequence_parallelism=args.sp > 1, - enable_fused_normalization=torch.cuda.is_available(), - enable_flash_attention=args.xformers, - microbatch_size=args.mbs, - precision="bf16", - enable_metadata_cache=not args.no_cache, - overlap_allgather=args.overlap_allgather, - use_fp8=args.use_fp8, - fp8_communication=args.use_fp8_comm, - **hybrid_kwargs, - ) - else: - raise ValueError(f"Unknown plugin {args.plugin}") - - booster = Booster(plugin=plugin) - - # ============================== - # Initialize Dataset and Dataloader - # ============================== - dp_size = getattr(plugin, "dp_size", coordinator.world_size) - - config = MODEL_CONFIGS[args.config](args.model_path) - - torch.cuda.manual_seed(42) - - dataset = RandomDataset( - num_samples=args.batch_size * args.num_steps * dp_size, max_length=args.max_length, vocab_size=config.vocab_size - ) - dataloader = plugin.prepare_dataloader(dataset, batch_size=args.batch_size, shuffle=True, drop_last=True, seed=42) - - # ============================== - # Initialize Model and Optimizer - # ============================== - init_ctx = ( - LazyInitContext(default_device=get_accelerator().get_current_device()) - if isinstance(plugin, MoeHybridParallelPlugin) - else nullcontext() - ) - - with init_ctx: - model = AutoModelForCausalLM.from_config(config, trust_remote_code=True).to(torch.bfloat16) - - if args.grad_checkpoint: - model.gradient_checkpointing_enable() - - model_numel = get_model_numel(model) - coordinator.print_on_master(f"Model params: {format_numel_str(model_numel)}") - performance_evaluator = PerformanceEvaluator( - model_numel, - model.config.num_hidden_layers, - model.config.hidden_size, - model.config.vocab_size, - args.grad_checkpoint, - args.ignore_steps, - dp_world_size=dp_size, - ) - - optimizer = HybridAdam(model.parameters()) - torch.set_default_dtype(torch.bfloat16) - model, optimizer, _, dataloader, _ = booster.boost(model, optimizer, dataloader=dataloader) - - torch.set_default_dtype(torch.float) - coordinator.print_on_master( - f"Booster init max CUDA memory: {get_accelerator().max_memory_allocated()/1024**2:.2f} MB" - ) - coordinator.print_on_master( - f"Booster init max CPU memory: {resource.getrusage(resource.RUSAGE_SELF).ru_maxrss/1024:.2f} MB" - ) - - with get_profile_context( - args.profile, - args.ignore_steps, - 1, # avoid creating massive log files - save_dir=f"profile/{time.strftime('%H:%M', time.localtime())}-{args.plugin}-llama-{args.config}", - nsys=args.nsys, - ) as prof: # , distributed_debug_mode(10, enable=True): - if isinstance(plugin, MoeHybridParallelPlugin) and args.pp > 1: - data_iter = iter(dataloader) - for step in tqdm(range(len(dataloader)), desc="Step", disable=not coordinator.is_master()): - performance_evaluator.on_step_start(step) - outputs = booster.execute_pipeline( - data_iter, - model, - criterion=lambda outputs, inputs: outputs[0], - optimizer=optimizer, - return_loss=True, - ) - loss = outputs["loss"] - if dist.get_rank() == dist.get_world_size() - 1: - print(f"Step {step} loss: {loss}") - optimizer.step() - optimizer.zero_grad() - - performance_evaluator.on_step_end(input_ids=torch.empty(args.batch_size, args.max_length)) - prof.step() - # print(f"rank {dist.get_rank()} step {step} passed") - else: - for step, batch in enumerate(tqdm(dataloader, desc="Step", disable=not coordinator.is_master())): - performance_evaluator.on_step_start(step) - outputs = model(**batch) - loss = outputs[0] - del outputs # free memory - - if dist.get_rank() == dist.get_world_size() - 1: - print(f"Step {step} loss: {loss}") - - booster.backward(loss, optimizer) - optimizer.step() - optimizer.zero_grad() - - performance_evaluator.on_step_end(**batch) - prof.step() - - performance_evaluator.on_fit_end() - coordinator.print_on_master(f"Max CUDA memory usage: {get_accelerator().max_memory_allocated()/1024**2:.2f} MB") - - -if __name__ == "__main__": - main() diff --git a/nlp/llm/deepseek_moe_7b/colossalai/data_utils.py b/nlp/llm/deepseek_moe_7b/colossalai/data_utils.py deleted file mode 100644 index 6b9e8ef28eb7f18468ca6949743032b7c239a4b0..0000000000000000000000000000000000000000 --- a/nlp/llm/deepseek_moe_7b/colossalai/data_utils.py +++ /dev/null @@ -1,124 +0,0 @@ -import json -import random -from typing import Iterator, Optional - -import numpy as np -import torch -from torch.distributed import ProcessGroup -from torch.distributed.distributed_c10d import _get_default_group -from torch.utils.data import DataLoader, Dataset, DistributedSampler - -from colossalai.accelerator import get_accelerator - - -class StatefulDistributedSampler(DistributedSampler): - def __init__( - self, - dataset: Dataset, - num_replicas: Optional[int] = None, - rank: Optional[int] = None, - shuffle: bool = True, - seed: int = 0, - drop_last: bool = False, - ) -> None: - super().__init__(dataset, num_replicas, rank, shuffle, seed, drop_last) - self.start_index: int = 0 - - def __iter__(self) -> Iterator: - iterator = super().__iter__() - indices = list(iterator) - indices = indices[self.start_index :] - return iter(indices) - - def __len__(self) -> int: - return self.num_samples - self.start_index - - def set_start_index(self, start_index: int) -> None: - self.start_index = start_index - - -def prepare_dataloader( - dataset, - batch_size, - shuffle=False, - seed=1024, - drop_last=False, - pin_memory=False, - num_workers=0, - process_group: Optional[ProcessGroup] = None, - **kwargs, -): - r""" - Prepare a dataloader for distributed training. The dataloader will be wrapped by - `torch.utils.data.DataLoader` and `StatefulDistributedSampler`. - - - Args: - dataset (`torch.utils.data.Dataset`): The dataset to be loaded. - shuffle (bool, optional): Whether to shuffle the dataset. Defaults to False. - seed (int, optional): Random worker seed for sampling, defaults to 1024. - add_sampler: Whether to add ``DistributedDataParallelSampler`` to the dataset. Defaults to True. - drop_last (bool, optional): Set to True to drop the last incomplete batch, if the dataset size - is not divisible by the batch size. If False and the size of dataset is not divisible by - the batch size, then the last batch will be smaller, defaults to False. - pin_memory (bool, optional): Whether to pin memory address in CPU memory. Defaults to False. - num_workers (int, optional): Number of worker threads for this dataloader. Defaults to 0. - kwargs (dict): optional parameters for ``torch.utils.data.DataLoader``, more details could be found in - `DataLoader `_. - - Returns: - :class:`torch.utils.data.DataLoader`: A DataLoader used for training or testing. - """ - _kwargs = kwargs.copy() - process_group = process_group or _get_default_group() - sampler = StatefulDistributedSampler( - dataset, num_replicas=process_group.size(), rank=process_group.rank(), shuffle=shuffle - ) - - # Deterministic dataloader - def seed_worker(worker_id): - worker_seed = seed - np.random.seed(worker_seed) - torch.manual_seed(worker_seed) - random.seed(worker_seed) - - return DataLoader( - dataset, - batch_size=batch_size, - sampler=sampler, - worker_init_fn=seed_worker, - drop_last=drop_last, - pin_memory=pin_memory, - num_workers=num_workers, - **_kwargs, - ) - - -def load_json(file_path: str): - with open(file_path, "r") as f: - return json.load(f) - - -def save_json(data, file_path: str): - with open(file_path, "w") as f: - json.dump(data, f, indent=4) - - -class RandomDataset(Dataset): - def __init__(self, num_samples: int = 1000, max_length: int = 2048, vocab_size: int = 32000): - self.num_samples = num_samples - self.max_length = max_length - self.input_ids = torch.randint( - 0, vocab_size, (num_samples, max_length), device=get_accelerator().get_current_device() - ) - self.attention_mask = torch.ones_like(self.input_ids) - - def __len__(self): - return self.num_samples - - def __getitem__(self, idx): - return { - "input_ids": self.input_ids[idx], - "attention_mask": self.attention_mask[idx], - "labels": self.input_ids[idx], - } diff --git a/nlp/llm/deepseek_moe_7b/colossalai/deepseek_moe_7b_pretrain.sh b/nlp/llm/deepseek_moe_7b/colossalai/deepseek_moe_7b_pretrain.sh deleted file mode 100644 index 4a6e3bfa177cb39f74cff231f99dd30258ef72ca..0000000000000000000000000000000000000000 --- a/nlp/llm/deepseek_moe_7b/colossalai/deepseek_moe_7b_pretrain.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash -# Copyright (c) 2024, Shanghai Iluvatar CoreX Semiconductor Co., Ltd. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -colossalai run --nproc_per_node 16 benchmark.py -c 7b -g -b 16 --tp 1 --pp 4 --num_steps 50 --model_path /home/model_zoos/nlp/deepseek-moe-16b-base diff --git a/nlp/llm/deepseek_moe_7b/colossalai/model_utils.py b/nlp/llm/deepseek_moe_7b/colossalai/model_utils.py deleted file mode 100644 index 63569bc61143b9abbba424ea312359c8ce85bbca..0000000000000000000000000000000000000000 --- a/nlp/llm/deepseek_moe_7b/colossalai/model_utils.py +++ /dev/null @@ -1,32 +0,0 @@ -from contextlib import contextmanager - -import torch -import torch.nn as nn - - -@contextmanager -def low_precision_init(target_dtype: torch.dtype = torch.float16): - dtype = torch.get_default_dtype() - try: - torch.set_default_dtype(target_dtype) - yield - finally: - torch.set_default_dtype(dtype) - - -def get_model_numel(model: nn.Module) -> int: - return sum(p.numel() for p in model.parameters()) - - -def format_numel_str(numel: int) -> str: - B = 1024**3 - M = 1024**2 - K = 1024 - if numel >= B: - return f"{numel / B:.2f} B" - elif numel >= M: - return f"{numel / M:.2f} M" - elif numel >= K: - return f"{numel / K:.2f} K" - else: - return f"{numel}" diff --git a/nlp/llm/deepseek_moe_7b/colossalai/performance_evaluator.py b/nlp/llm/deepseek_moe_7b/colossalai/performance_evaluator.py deleted file mode 100644 index 65c7e49a2f03b7b7ae1c8d79e0efad24b836c1e9..0000000000000000000000000000000000000000 --- a/nlp/llm/deepseek_moe_7b/colossalai/performance_evaluator.py +++ /dev/null @@ -1,172 +0,0 @@ -from time import time -from typing import Optional - -import torch -import torch.distributed as dist -from torch import Tensor -from torch.profiler import ProfilerActivity, profile, schedule, tensorboard_trace_handler - -from colossalai.cluster import DistCoordinator - - -def divide(x: float, y: float) -> float: - if y == 0: - return float("inf") - elif y == float("inf"): - return float("nan") - return x / y - - -@torch.no_grad() -def all_reduce_mean(x: float, world_size: int) -> float: - if world_size == 1: - return x - - # Use CPU tensor to avoid OOM/weird NCCl error - gloo_group = dist.new_group(backend="gloo") - tensor = torch.tensor([x], device="cpu") - dist.all_reduce(tensor, group=gloo_group) - tensor = tensor / world_size - return tensor.item() - - -def get_profile_context(enable_flag, warmup_steps, active_steps, save_dir, nsys=False): - class DummyProfiler: - def __init__(self): - self.step_number = 0 - - def step(self): - self.step_number += 1 - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - pass - - class NsysProfiler: - def __init__(self, warmup_steps, active_steps): - self.step_number = 0 - self.warmup_steps = warmup_steps - self.active_steps = active_steps - - def step(self): - if self.step_number == self.warmup_steps: - torch.cuda.cudart().cudaProfilerStart() - elif self.step_number == self.warmup_steps + self.active_steps: - torch.cuda.cudart().cudaProfilerStop() - self.step_number += 1 - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - pass - - if enable_flag: - if nsys: - return NsysProfiler(warmup_steps, active_steps) - - return profile( - activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA], - schedule=schedule(wait=0, warmup=warmup_steps, active=active_steps), - on_trace_ready=tensorboard_trace_handler(save_dir), - record_shapes=True, - profile_memory=True, - with_stack=True, - ) - else: - return DummyProfiler() - - -class Timer: - def __init__(self) -> None: - self.start_time: Optional[float] = None - self.duration: float = 0.0 - - def start(self) -> None: - self.start_time = time() - - def end(self) -> None: - assert self.start_time is not None - self.duration += time() - self.start_time - self.start_time = None - - def reset(self) -> None: - self.duration = 0.0 - - -class PerformanceEvaluator: - """ - Callback for valuate the performance of the model. - Args: - actor_num_params: The number of parameters of the actor model. - critic_num_params: The number of parameters of the critic model. - initial_model_num_params: The number of parameters of the initial model. - reward_model_num_params: The number of parameters of the reward model. - enable_grad_checkpoint: Whether to enable gradient checkpointing. - ignore_episodes: The number of episodes to ignore when calculating the performance. - """ - - def __init__( - self, - model_numel: int, - num_layers: int, - hidden_size: int, - vocab_size: int, - enable_grad_checkpoint: bool = False, - ignore_steps: int = 0, - dp_world_size: Optional[int] = None, - ) -> None: - self.model_numel = model_numel - self.enable_grad_checkpoint = enable_grad_checkpoint - self.ignore_steps = ignore_steps - self.num_layers = num_layers - self.hidden_size = hidden_size - self.vocab_size = vocab_size - - self.coordinator = DistCoordinator() - self.dp_world_size = dp_world_size or self.coordinator.world_size - self.disable: bool = False - self.timer = Timer() - self.num_samples: int = 0 - self.flop_megatron = 0 - self.flop: int = 0 - - def on_step_start(self, step: int) -> None: - self.disable = self.ignore_steps > 0 and step < self.ignore_steps - if self.disable: - return - # get_accelerator().synchronize() - self.timer.start() - - def on_step_end(self, input_ids: Tensor, **kwargs) -> None: - if self.disable: - return - # get_accelerator().synchronize() - self.timer.end() - - batch_size, seq_len = input_ids.shape - - self.num_samples += batch_size - checkpoint_activations_factor = 3 + int(self.enable_grad_checkpoint) - self.flop_megatron += ( - 24 * checkpoint_activations_factor * batch_size * seq_len * self.num_layers * (self.hidden_size**2) - ) * ( - 1.0 + (seq_len / (6.0 * self.hidden_size)) + (self.vocab_size / (16.0 * self.num_layers * self.hidden_size)) - ) - self.flop += batch_size * seq_len * self.model_numel * 2 * (3 + int(self.enable_grad_checkpoint)) - - def on_fit_end(self) -> None: - avg_duration = all_reduce_mean(self.timer.duration, self.coordinator.world_size) - avg_throughput = self.num_samples * self.dp_world_size / (avg_duration + 1e-12) - mp_world_size = self.coordinator.world_size // self.dp_world_size - avg_tflops_per_gpu_megatron = self.flop_megatron / 1e12 / (avg_duration + 1e-12) / mp_world_size - avg_tflops_per_gpu = self.flop / 1e12 / (avg_duration + 1e-12) / mp_world_size - self.coordinator.print_on_master( - f"num_samples: {self.num_samples}, dp_world_size: {self.dp_world_size}, flop_megatron: {self.flop_megatron}, flop: {self.flop}, avg_duration: {avg_duration}, " - f"avg_throughput: {avg_throughput}" - ) - self.coordinator.print_on_master( - f"Throughput: {avg_throughput:.2f} samples/sec, TFLOPS per GPU by Megatron: {avg_tflops_per_gpu_megatron:.2f}, TFLOPS per GPU: {avg_tflops_per_gpu:.2f}" - ) diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/README.md b/nlp/llm/llama3_8b/colossalai/README.md similarity index 56% rename from nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/README.md rename to nlp/llm/llama3_8b/colossalai/README.md index 7fdca2d6b9c9d5b04955cbbed4351f8de11cc729..b154fb77d0fc8737d28853200f1dd6ef2c98ba22 100644 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/README.md +++ b/nlp/llm/llama3_8b/colossalai/README.md @@ -4,27 +4,33 @@ The Llama 3 Herd of models natively supports multilinguality, coding, reasoning, and tool usage. Our largest model is dense Transformer with 405B parameters, processing information in a context window of up to 128K tokens, Llama 3 8B is the smallest model of Llama 3 Herd of models. -## Step 1: Installation +## Step 1: Preparing checkpoints -Firstly, you should ensure that the corresponding version of ColossalAI has been installed in the iluvatar environment. Then install applications as follows: +Get "Meta-Llama-3-8B" models and config file from modelscope or other place, and mv it to "/home/model_zoos/". +One recommended link: "". ```sh -cd ColossalAI/applications/Colossal-LLaMA -pip3 install -e . +mkdir -p /home/model_zoos/ +mv /Meta-Llama-3-8B /home/model_zoos/ + +wget http://files.deepspark.org.cn:880/deepspark/tokenizer.model +cp tokenizer.model /home/model_zoos/Meta-Llama-3-8B ``` -## Step 2: Preparing datasets and checkpoints +## Step 2: Installation and preparing datasets -```sh -pip3 install modelscope -python3 ./get_Meta_LLaMA_8B.py -mkdir -p /home/model_zoos/nlp -mv ~/.cache/modelscope/hub/LLM-Research/Meta-Llama-3-8B /home/model_zoos/nlp +You should ensure that the corresponding version of ColossalAI has been installed in the iluvatar environment. Then install applications as follows: -wget http://files.deepspark.org.cn:880/deepspark/tokenizer.model -cp tokenizer.model /home/model_zoos/nlp/Meta-Llama-3-8B +```sh +git clone -b v0.4.4 https://github.com/hpcaitech/ColossalAI.git --depth=1 +cd ColossalAI +cp -rf /toolbox/ColossalAI/v0.4.4/patches/* ./ +cd applications/Colossal-LLaMA +pip3 install -e . +# preparing datasets wget http://files.deepspark.org.cn:880/deepspark/school_math_0.25M.jsonl +mkdir -p dataset/school_math/convert/ mv school_math_0.25M.jsonl dataset/school_math bash ./prepare_sft_dataset.sh llama3 ``` diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/__init__.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/__init__.py deleted file mode 100644 index 56fafa58b3f43decb7699b93048b8b87e0f695aa..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/dataset/__init__.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/dataset/__init__.py deleted file mode 100644 index 56fafa58b3f43decb7699b93048b8b87e0f695aa..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/dataset/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/dataset/conversation.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/dataset/conversation.py deleted file mode 100644 index 8ec9c848b2c838e31d6f6ccc4653ae246631290b..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/dataset/conversation.py +++ /dev/null @@ -1,106 +0,0 @@ -# Copyright 2023 lm-sys@FastChat -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import dataclasses -from enum import Enum, auto -from typing import List - - -class SeparatorStyle(Enum): - ADD_BOS_EOS_TOKEN = auto() - - -@dataclasses.dataclass -class Conversation: - system: str - roles: List[str] - messages: List[List[str]] - offset: int - sep_style: SeparatorStyle - seps: List[str] - - def clear(self): - self.messages = [] - - def get_prompt(self, length: int = None): - if length is None: - length = len(self.messages) - - if self.sep_style == SeparatorStyle.ADD_BOS_EOS_TOKEN: - ret = self.system - for role, message in self.messages[0:length]: - if message: - ret += role + ": " + self.seps[0] + message + self.seps[1] - else: - ret += role + ": " + self.seps[0] - return ret - else: - raise ValueError(f"Invalid style: {self.sep_style}") - - def save_prompt(self): - if self.sep_style == SeparatorStyle.ADD_BOS_EOS_TOKEN: - ret = self.system - for role, message in self.messages: - if message: - ret += role + ": " + self.seps[0] + message + self.seps[1] + "\n" - else: - ret += role + ": " + self.seps[0] - return ret - else: - raise ValueError(f"Invalid style: {self.sep_style}") - - def append_message(self, role, message): - self.messages.append([role, message]) - - def copy(self): - return Conversation( - system=self.system, - roles=self.roles, - messages=[[x, y] for x, y in self.messages], - offset=self.offset, - sep_style=self.sep_style, - seps=self.seps, - ) - - def dict(self): - return { - "system": self.system, - "roles": self.roles, - "messages": self.messages, - "offset": self.offset, - "seps": self.seps, - } - - -LLaMA2_Conv = Conversation( - system="A chat between a curious human and an artificial intelligence assistant. " - "The assistant gives helpful, detailed, and polite answers to the human's questions.\n\n", - roles=("Human", "Assistant"), - messages=[], - offset=0, - sep_style=SeparatorStyle.ADD_BOS_EOS_TOKEN, - seps=["", ""], -) - -LLaMA3_Conv = Conversation( - system="A chat between a curious human and an artificial intelligence assistant. " - "The assistant gives helpful, detailed, and polite answers to the human's questions.\n\n", - roles=("Human", "Assistant"), - messages=[], - offset=0, - sep_style=SeparatorStyle.ADD_BOS_EOS_TOKEN, - seps=["<|begin_of_text|>", "<|end_of_text|>"], -) - -default_conversation = LLaMA3_Conv diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/dataset/dummy_dataset.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/dataset/dummy_dataset.py deleted file mode 100644 index 3175159fcd3785b8f7d7bb66c1428c76a88e5c8b..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/dataset/dummy_dataset.py +++ /dev/null @@ -1,24 +0,0 @@ -import torch -from torch.utils.data import Dataset - -from colossalai.accelerator import get_accelerator - - -class RandomDataset(Dataset): - def __init__(self, num_samples: int = 1000, max_length: int = 2048, vocab_size: int = 32000): - self.num_samples = num_samples - self.max_length = max_length - self.input_ids = torch.randint( - 0, vocab_size, (num_samples, max_length), device=get_accelerator().get_current_device() - ) - self.attention_mask = torch.ones_like(self.input_ids) - - def __len__(self): - return self.num_samples - - def __getitem__(self, idx): - return { - "input_ids": self.input_ids[idx], - "attention_mask": self.attention_mask[idx], - "labels": self.input_ids[idx], - } diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/dataset/loader.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/dataset/loader.py deleted file mode 100644 index b1807260c8a0660d505e46c50333578a0291136a..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/dataset/loader.py +++ /dev/null @@ -1,182 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# Copyright (c) 2024, Shanghai Iluvatar CoreX Semiconductor Co., Ltd. -# All Rights Reserved. - -import os -from dataclasses import dataclass -from typing import Dict, Iterator, List, Optional, Sequence, Union - -import torch -import torch.nn.functional as F -from datasets import Dataset as HFDataset -from datasets import dataset_dict, load_from_disk -from torch.utils.data import ConcatDataset, Dataset, DistributedSampler -from transformers.tokenization_utils import PreTrainedTokenizer - -DatasetType = Union[Dataset, ConcatDataset, dataset_dict.Dataset] -PathType = Union[str, os.PathLike] - - -def load_tokenized_dataset( - dataset_parrent_path: Union[PathType, List[PathType]], mode: str = "train" -) -> Optional[DatasetType]: - """ - Load pre-tokenized dataset. - Each instance of dataset is a dictionary with - `{'input_ids': List[int], 'labels': List[int], sequence: str}` format. - """ - mode_map = {"train": "train", "dev": "validation", "test": "test"} - assert mode in tuple(mode_map), f"Unsupported mode {mode}, it must be in {tuple(mode_map)}" - if dataset_parrent_path: - dataset_paths=[] - for dirname in os.listdir(dataset_parrent_path): - dataset_paths.append(os.path.join(dataset_parrent_path, dirname)) - - # if isinstance(dataset_paths, (str, os.PathLike)): - # dataset_paths = [dataset_paths] - - datasets = [] # `List[datasets.dataset_dict.Dataset]` - for ds_path in dataset_paths: - ds_path = os.path.abspath(ds_path) - assert os.path.exists(ds_path), f"Not existed file path {ds_path}" - ds_dict = load_from_disk(dataset_path=ds_path, keep_in_memory=False) - if isinstance(ds_dict, HFDataset): - datasets.append(ds_dict) - else: - if mode_map[mode] in ds_dict: - datasets.append(ds_dict[mode_map[mode]]) - if len(datasets) == 0: - return None - if len(datasets) == 1: - return datasets.pop() - return ConcatDataset(datasets=datasets) - - -@dataclass -class DataCollatorForSupervisedDataset(object): - """ - Collate instances for supervised dataset. - Each instance is a tokenized dictionary with fields - `input_ids`(List[int]), `labels`(List[int]) and `sequence`(str). - """ - - tokenizer: PreTrainedTokenizer - max_length: int = 4096 - ignore_index: int = -100 - padding: str = "max_length" - - def __call__(self, instances: Sequence[Dict[str, List[int]]]) -> Dict[str, torch.Tensor]: - """ - - Args: - instances (`Sequence[Dict[str, List[int]]]`): - Mini-batch samples, each sample is stored in an individual dictionary. - - Returns: - (`Dict[str, torch.Tensor]`): Contains the following `torch.Tensor`: - `input_ids`: `torch.Tensor` of shape (bsz, max_len); - `attention_mask`: `torch.BoolTensor` of shape (bsz, max_len); - `labels`: `torch.Tensor` of shape (bsz, max_len), which contains `IGNORE_INDEX`. - """ - assert isinstance(self.tokenizer.pad_token_id, int) and self.tokenizer.pad_token_id >= 0, ( - f"`{self.tokenizer.__class__.__name__}.pad_token_id` must be a valid non-negative integer index value, " - f"but now `{self.tokenizer.pad_token_id}`" - ) - - # `List[torch.Tensor]` - batch_input_ids = [ - ( - torch.LongTensor(instance["input_ids"][: self.max_length]) - if len(instance["input_ids"]) > self.max_length - else torch.LongTensor(instance["input_ids"][:-1]) - ) - for instance in instances - ] - batch_labels = [ - ( - torch.LongTensor(instance["labels"][1: self.max_length+1]) - if len(instance["labels"]) > self.max_length - else torch.LongTensor(instance["labels"])[1:] - ) - for instance in instances - ] - - if self.tokenizer.padding_side == "right": - input_ids = torch.nn.utils.rnn.pad_sequence( - sequences=batch_input_ids, - batch_first=True, - padding_value=self.tokenizer.pad_token_id, - ) # (bsz, max_len) - labels = torch.nn.utils.rnn.pad_sequence( - sequences=batch_labels, - batch_first=True, - padding_value=self.ignore_index, - ) # (bsz, max_len) - if self.padding == "max_length": - # pad to max - to_pad = self.max_length - input_ids.size(1) - input_ids = F.pad(input_ids, (0, to_pad), value=self.tokenizer.pad_token_id) - labels = F.pad(labels, (0, to_pad), value=self.ignore_index) - elif self.tokenizer.padding_side == "left": - reversed_input_ids = [seq.flip(dims=(0,)) for seq in batch_input_ids] - reversed_input_ids = torch.nn.utils.rnn.pad_sequence( - sequences=reversed_input_ids, - batch_first=True, - padding_value=self.tokenizer.pad_token_id, - ) # (bsz, max_len) - input_ids = torch.flip(reversed_input_ids, dims=(1,)) # (bsz, max_len) - reversed_labels = [seq.flip(dims=(0,)) for seq in batch_labels] - reversed_labels = torch.nn.utils.rnn.pad_sequence( - sequences=reversed_labels, - batch_first=True, - padding_value=self.ignore_index, - ) # (bsz, max_len) - labels = torch.flip(reversed_labels, dims=(1,)) # (bsz, max_len) - else: - raise RuntimeError( - f"`{self.tokenizer.__class__.__name__}.padding_side` can only be `left` or `right`, " - f"but now `{self.tokenizer.padding_side}`" - ) - - attention_mask = input_ids.ne(self.tokenizer.pad_token_id) # `torch.BoolTensor`, (bsz, max_len) - - return dict(input_ids=input_ids, attention_mask=attention_mask, labels=labels) - - -class StatefulDistributedSampler(DistributedSampler): - """ - Stateful distributed sampler for multi-stage training. - """ - - def __init__( - self, - dataset: DatasetType, - num_replicas: Optional[int] = None, - rank: Optional[int] = None, - shuffle: bool = True, - seed: int = 0, - drop_last: bool = False, - ) -> None: - super().__init__( - dataset=dataset, - num_replicas=num_replicas, - rank=rank, - shuffle=shuffle, - seed=seed, - drop_last=drop_last, - ) - self.start_index = 0 - - def __iter__(self) -> Iterator: - iterator = super().__iter__() - indices = list(iterator) - indices = indices[self.start_index :] - return iter(indices) - - def __len__(self) -> int: - return self.num_samples - self.start_index - - def set_start_index(self, start_index: int) -> None: - self.start_index = start_index diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/dataset/spliced_and_tokenized_dataset.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/dataset/spliced_and_tokenized_dataset.py deleted file mode 100644 index 505bed8c885a7d2950b1d1cb91a1f907af028cf3..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/dataset/spliced_and_tokenized_dataset.py +++ /dev/null @@ -1,366 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# Copyright (c) 2024, Shanghai Iluvatar CoreX Semiconductor Co., Ltd. -# All Rights Reserved. - -""" -Splicing multiple pre-tokenized sequence data points -""" - -import bisect -import random -import warnings -from copy import deepcopy -from typing import Any, Callable, Dict, Iterable, List, Tuple, Union - -from datasets import dataset_dict -from torch.utils.data import ConcatDataset, Dataset, IterableDataset -from transformers import AutoTokenizer -from transformers.models.llama.tokenization_llama import LlamaTokenizer -from transformers.tokenization_utils import PreTrainedTokenizer - -from colossalai.logging import get_dist_logger - -from .conversation import Conversation, default_conversation - -logger = get_dist_logger() - -IGNORE_INDEX = -100 - -DSType = Union[Dataset, ConcatDataset, dataset_dict.Dataset] - -def bisect_right(a, x, lo=0, hi=None, *, key=None): - """Return the index where to insert item x in list a, assuming a is sorted. - - The return value i is such that all e in a[:i] have e <= x, and all e in - a[i:] have e > x. So if x already appears in the list, a.insert(i, x) will - insert just after the rightmost x already there. - - Optional args lo (default 0) and hi (default len(a)) bound the - slice of a to be searched. - """ - - if lo < 0: - raise ValueError('lo must be non-negative') - if hi is None: - hi = len(a) - # Note, the comparison uses "<" to match the - # __lt__() logic in list.sort() and in heapq. - if key is None: - while lo < hi: - mid = (lo + hi) // 2 - if x < a[mid]: - hi = mid - else: - lo = mid + 1 - else: - while lo < hi: - mid = (lo + hi) // 2 - if x < key(a[mid]): - hi = mid - else: - lo = mid + 1 - return lo - -def supervised_tokenize_pretrain_webtext( - data_point: Dict[str, str], tokenizer: LlamaTokenizer, ignore_index: int = None, max_length: int = 4096 -) -> Dict[str, Union[int, str, List[int]]]: - - """ - A tokenization function to tokenize an original pretraining data point as following: - {"id": 0, "text": "Beijing, the capital of the People's Republic of China, ...", "length": 124,"ended": False} - """ - assert tokenizer.add_bos_token is False and tokenizer.add_eos_token is False, ( - "Initially set `tokenizer.add_bos_token` and `tokenizer.add_eos_token` to False, " - "add and manually later" - ) - - text = data_point["text"] - sequence_text = tokenizer.bos_token + text + tokenizer.eos_token - sequence_input_ids = tokenizer(sequence_text)["input_ids"] - sequence_labels = deepcopy(sequence_input_ids) - if len(sequence_input_ids) > max_length: - sequence_input_ids = sequence_input_ids[:max_length] - sequence_labels = sequence_labels[:max_length] - - return dict( - input_ids=sequence_input_ids, - labels=sequence_labels, - seq_length=len(sequence_input_ids), - ) - - - -def supervised_tokenize_pretrain( - data_point: Dict[str, str], tokenizer: LlamaTokenizer, ignore_index: int = None, max_length: int = 4096 -) -> Dict[str, Union[int, str, List[int]]]: - """ - A tokenization function to tokenize an original pretraining data point as following: - {"source": "", "target": "Beijing, the capital of the People's Republic of China, ...", "category": "geography"} - """ - assert tokenizer.add_bos_token is False and tokenizer.add_eos_token is False, ( - "Initially set `tokenizer.add_bos_token` and `tokenizer.add_eos_token` to False, " - "add and manually later" - ) - if ignore_index is None: - ignore_index = IGNORE_INDEX - - source_text = data_point["source"] # `str` - target_text = data_point["target"] # `str` - is_null_source = len(source_text) == 0 - - source_text = tokenizer.bos_token + source_text - target_text += tokenizer.eos_token - sequence_text = source_text + target_text - - tokenized = tokenizer([source_text, sequence_text])["input_ids"] - sequence_input_ids = tokenized[1] - sequence_labels = deepcopy(sequence_input_ids) - - source_length = len(tokenized[0]) - if not is_null_source: - sequence_labels[:source_length] = [ignore_index for _ in range(source_length)] - - # sequence truncation. - if len(sequence_input_ids) > max_length: - sequence_input_ids = sequence_input_ids[:max_length] - sequence_labels = sequence_labels[:max_length] - - return dict( - input_ids=sequence_input_ids, - labels=sequence_labels, - seq_length=len(sequence_input_ids), - seq_category=data_point["category"], - ) - - -def supervised_tokenize_sft( - data_point: Dict[str, str], - tokenizer: AutoTokenizer, - conversation_template: Conversation = default_conversation, - ignore_index: int = None, - max_length: int = 4096, -) -> Dict[str, Union[int, str, List[int]]]: - """ - A tokenization function to tokenize an original supervised data point as following: - {"messages": [{"from": "human", "content": "xxx"}, {"from": "assistant", "content": "xxx"}]} - """ - assert tokenizer.add_bos_token is False and tokenizer.add_eos_token is False, ( - "Initially set `tokenizer.add_bos_token` and `tokenizer.add_eos_token` to False, " - "add and manually later" - ) - - assert ( - tokenizer.bos_token == conversation_template.seps[0] and tokenizer.eos_token == conversation_template.seps[1] - ), "`bos_token` and `eos_token` should be the same with `conversation_template.seps`." - - if ignore_index is None: - ignore_index = IGNORE_INDEX - - messages = data_point["messages"] - template = deepcopy(conversation_template) - template.messages = [] - - for mess in messages: - from_str = mess["from"] - if from_str.lower() == "human": - from_str = template.roles[0] - elif from_str.lower() == "assistant": - from_str = template.roles[1] - else: - raise ValueError(f"Unsupported role {from_str.lower()}") - - template.append_message(from_str, mess["content"]) - - if len(template.messages) % 2 != 0: - template.messages = template.messages[0:-1] - - # `target_turn_index` is the number of turns which exceeds `max_length - 1` for the first time. - turns = [i for i in range(1, len(messages) // 2 + 1)] - target_turn_index = bisect_right( - turns, - max_length - 1, - key=lambda x: len(tokenizer([template.get_prompt(2 * x)], add_special_tokens=False)["input_ids"][0]), - ) - - # The tokenized length for first turn already exceeds `max_length - 1`. - if target_turn_index - 1 < 0: - return dict( - input_ids=None, - labels=None, - inputs_decode=None, - labels_decode=None, - seq_length=None, - seq_category=None, - ) - - target_turn = turns[target_turn_index - 1] - prompt = template.get_prompt(2 * target_turn) - tokenized = tokenizer([prompt], add_special_tokens=False)["input_ids"][0] - - template.messages = template.messages[0 : 2 * target_turn] - - starts = [] - ends = [] - gpt_bos = False if template.messages[0][0] == template.roles[0] else True - gpt_eos = False if template.messages[0][0] == template.roles[0] else True - - for i, token_id in enumerate(tokenized): - if token_id == tokenizer.bos_token_id: - if gpt_bos: - starts.append(i) - gpt_bos = not gpt_bos - elif token_id == tokenizer.eos_token_id: - if gpt_eos: - ends.append(i) - gpt_eos = not gpt_eos - - if len(starts) != target_turn or len(ends) != target_turn: - logger.info( - "Please check whether the tokenizer add additional `bos_token` and `eos_token`.\n\nOr the original message contains `bos_token` or `eos_token`." - ) - return dict( - input_ids=None, - labels=None, - inputs_decode=None, - labels_decode=None, - seq_length=None, - seq_category=None, - ) - - tokenized = [tokenizer.bos_token_id] + tokenized - labels = [ignore_index] * len(tokenized) - for start, end in zip(starts, ends): - labels[start + 1 : end + 2] = tokenized[start + 1 : end + 2] - - labels_decode = deepcopy(labels) - for i, z in enumerate(labels_decode): - if z == ignore_index: - labels_decode[i] = tokenizer.unk_token_id - - # `inputs_decode` and `labels_decode` can be used to check whether the tokenization method is true. - return dict( - input_ids=tokenized, - labels=labels, - inputs_decode=tokenizer.decode(tokenized), - labels_decode=tokenizer.decode(labels_decode), - seq_length=len(tokenized), - seq_category=data_point["category"] if "category" in data_point else "None", - ) - - -class ClosedToConstantLengthSplicedDataset(IterableDataset): - """ - Define an iterable dataset that returns a (close to) constant length data point spliced from multiple - original independent (pre-tokenized) data points. - """ - - def __init__( - self, - dataset: DSType, - tokenizer: PreTrainedTokenizer, - max_length: int = 4096, - num_packed_sequences: int = 8, - fetch_sequence_func: Callable[[Any], Tuple[List[int], List[int]]] = None, - input_ids_field: str = "input_ids", - labels_field: str = "labels", - infinite: bool = False, - shuffle: bool = True, - error_strict: bool = False, - ) -> None: - self.tokenizer = tokenizer - self.dataset = dataset - self.max_length = max_length - self.infinite = infinite - self.max_buffer_size = max_length * num_packed_sequences # e.g., 4096 * 16 - self.shuffle = shuffle - - # Callable[[Dict[str, Any]], Tuple[List[int], List[int]]], - # A function that fetch sequence input_ids and labels from the original data point - if fetch_sequence_func is None: - self.fetch_sequence_func = lambda data_point: (data_point[input_ids_field], data_point[labels_field]) - else: - self.fetch_sequence_func = fetch_sequence_func - self.input_ids_field = input_ids_field - self.labels_field = labels_field - - self.error_strict = error_strict - self.current_size = 0 # `int`, current packed data size. - - def __len__(self) -> int: - return len(self.dataset) - - def __iter__(self) -> Iterable[Dict[str, List[int]]]: - iterator = iter(self.dataset) - more_data_points = True - while more_data_points is True: - buffer, buffer_len = [], 0 - while True: - # ending condition. - if buffer_len >= self.max_buffer_size: - break - try: - # `Tuple[List[int], List[int]]` - seq_input_ids, seq_labels = self.fetch_sequence_func(next(iterator)) - buffer.append({self.input_ids_field: seq_input_ids, self.labels_field: seq_labels}) - buffer_len += len(buffer[-1][self.input_ids_field]) - except StopIteration: - if self.infinite is True: - iterator = iter(self.dataset) - warnings.warn("The dataset reached end and the iterator is reset to the start.") - else: - more_data_points = False - break - examples = [] # `List[Dict[str, List[int]]]`, save buffered spliced data points. - spliced_input_ids, spliced_labels = [], [] # `List[int]`, `List[int]` - for i, data_point in enumerate(buffer): - # TODO(2023-09-18) check errors for each unspliced tokenized data point - seq_input_ids = data_point[self.input_ids_field] - seq_labels = data_point[self.labels_field] - # Handle special case: - # If the length of an original data point (i.e., input_ids length of a data point before splicing) - # exceeds `max_length`, truncate it. - if len(seq_input_ids) > self.max_length: - truncated_seq_input_ids = seq_input_ids[: self.max_length] - truncated_label_ids = seq_labels[: self.max_length] - if set(truncated_label_ids) == {IGNORE_INDEX}: - if self.error_strict is True: - raise ValueError( - f"Find an out-of-bounds length({len(seq_input_ids)}) data point " - f"with all label values as {IGNORE_INDEX}." - ) - else: - warnings.warn(f"Filter an error truncated data point (labels all {IGNORE_INDEX})") - continue # Skip the current error data point. - spliced_data_point = { - self.input_ids_field: truncated_seq_input_ids, - self.labels_field: truncated_label_ids, - } - examples.append(spliced_data_point) - warnings.warn("Find a data point to be truncated.") - continue - - # Pre action judgment. - if len(spliced_input_ids) + len(seq_input_ids) > self.max_length: - spliced_input_ids.extend(seq_input_ids) - spliced_labels.extend(seq_labels) - spliced_data_point = { - self.input_ids_field: spliced_input_ids[:self.max_length], - self.labels_field: spliced_labels[:self.max_length], - } # `Dict[str, List[int]]` - # Update. - spliced_input_ids, spliced_labels = [], [] - examples.append(spliced_data_point) - else: - spliced_input_ids.extend(seq_input_ids) - spliced_labels.extend(seq_labels) - # For residual spliced data point at the end of the data set - if self.infinite is False and more_data_points is False and len(spliced_input_ids) > 0: - examples.append({self.input_ids_field: spliced_input_ids, self.labels_field: spliced_labels}) - if self.shuffle: - random.shuffle(examples) - for spliced_data_point in examples: - # TODO(2023-09-18): check errors for each spliced tokenized data point. - self.current_size += 1 - yield spliced_data_point diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/model/init_model.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/model/init_model.py deleted file mode 100644 index f61291f35d04d473e41ea4864b6072c223c7c528..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/model/init_model.py +++ /dev/null @@ -1,110 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -Initialize new model with updated tokenizer by calculating the mean values from original model -""" -import argparse - -import numpy as np -import torch -from transformers import LlamaForCausalLM, LlamaTokenizer - -from colossalai.logging import get_dist_logger - -logger = get_dist_logger() - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument( - "--source_model_and_tokenizer_path", - type=str, - required=True, - default=None, - help="Source path of model & tokenizer", - ) - parser.add_argument("--target_tokenizer_path", type=str, required=True, default=None, help="Target tokenizer path") - parser.add_argument("--target_model_path", type=str, required=True, default=None, help="Target model path") - args = parser.parse_args() - - source_tokenizer = LlamaTokenizer.from_pretrained(args.source_model_and_tokenizer_path) - source_tokenizer.add_bos_token = False - source_tokenizer.add_eos_token = False - if source_tokenizer.pad_token is None: - source_tokenizer.pad_token = source_tokenizer.unk_token - source_vocab = source_tokenizer.get_vocab() - - target_tokenizer = LlamaTokenizer.from_pretrained(args.target_tokenizer_path) - target_tokenizer.add_bos_token = False - target_tokenizer.add_eos_token = False - if target_tokenizer.pad_token is None: - target_tokenizer.pad_token = target_tokenizer.unk_token - target_vocab = target_tokenizer.get_vocab() - target_inverted_vocab = {v: k for k, v in target_vocab.items()} - - assert len(target_vocab) > len( - source_vocab - ), f"Target vocab size({len(target_vocab)}) must be greater than source vocab size({len(source_vocab)})" - - gpu_device = torch.device("cuda:0") - cpu_device = torch.device("cpu") - - source_model = LlamaForCausalLM.from_pretrained(args.source_model_and_tokenizer_path) - source_model.eval() - source_model = source_model.to(gpu_device) - - source_input_embeddings = source_model.get_input_embeddings() - assert isinstance(source_input_embeddings, torch.nn.Embedding) - assert source_input_embeddings.weight.shape[0] == len(source_vocab) - source_input_embeddings.eval() - - source_output_embeddings = source_model.get_output_embeddings() - assert isinstance(source_output_embeddings, torch.nn.Linear) - assert source_output_embeddings.bias is None - assert source_output_embeddings.weight.shape[0] == len(source_vocab) - source_output_embeddings.eval() - - input_embeddings = source_input_embeddings.weight.cpu().detach().numpy() - output_embeddings = source_output_embeddings.weight.cpu().detach().numpy() - for i in range(len(source_vocab), len(target_vocab)): - if i % 500 == 0: - logger.info(f"processing {i}/{len(target_vocab)} target tokens") - target_token = target_inverted_vocab[i] - target_to_source_token_ids = torch.LongTensor(source_tokenizer([target_token])["input_ids"][0]) - target_to_source_token_ids = target_to_source_token_ids.to(gpu_device) - - target_to_source_input_embedding = ( - source_input_embeddings.weight[target_to_source_token_ids] - .mean(dim=0) - .unsqueeze(dim=0) - .cpu() - .detach() - .numpy() - ) - target_to_source_output_embedding = ( - source_output_embeddings.weight[target_to_source_token_ids] - .mean(dim=0) - .unsqueeze(dim=0) - .cpu() - .detach() - .numpy() - ) - - input_embeddings = np.concatenate((input_embeddings, target_to_source_input_embedding), axis=0) - output_embeddings = np.concatenate((output_embeddings, target_to_source_output_embedding), axis=0) - - source_model = source_model.to(cpu_device) - assert isinstance(source_model, LlamaForCausalLM) - - # expand - source_model.resize_token_embeddings(new_num_tokens=len(target_vocab)) - source_model.model.embed_tokens.weight.data = torch.Tensor(input_embeddings) - source_model.lm_head.weight.data = torch.Tensor(output_embeddings) - - source_model = source_model.half() - source_model.save_pretrained(save_directory=args.target_model_path) - - -if __name__ == "__main__": - main() diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/tokenizer/init_tokenizer.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/tokenizer/init_tokenizer.py deleted file mode 100644 index 439135503002fd992b243fd485836bf34b1963a4..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/tokenizer/init_tokenizer.py +++ /dev/null @@ -1,98 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -""" -Initialize new tokenizer for continual pre-training -""" - -import argparse -import json -import os -from typing import List, Union - -from sentencepiece import sentencepiece_model_pb2 as sp_pb2_model -from transformers.models.llama.tokenization_llama import LlamaTokenizer - -from colossalai.logging import get_dist_logger - -os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python" - -logger = get_dist_logger() - - -def expand_vocab_tokenizer( - source_tokenizer_dir: Union[str, os.PathLike], target_tokenizer_dir: Union[str, os.PathLike], new_tokens: List[str] -) -> None: - """Expand tokenizer for continue pre-training.""" - if os.path.exists(target_tokenizer_dir): - raise RuntimeError(f"Find existed directory {target_tokenizer_dir}") - - source_tokenizer = LlamaTokenizer.from_pretrained(source_tokenizer_dir) - logger.info(source_tokenizer) - source_sp_processor = source_tokenizer.sp_model - source_spm = sp_pb2_model.ModelProto() - source_spm.ParseFromString(source_sp_processor.serialized_model_proto()) - - logger.info(f"Source tokenizer size: {len(source_sp_processor)}") - - # Add new tokens to source tokenizer. - source_spm_tokens = set([p.piece for p in source_spm.pieces]) - for piece in new_tokens: - assert isinstance(piece, str), f"Invalid token({piece}) type {type(piece)}" - if piece in source_spm_tokens: - # Skip existed token. - continue - new_p = sp_pb2_model.ModelProto().SentencePiece() - new_p.piece = piece - new_p.score = 0 - source_spm.pieces.append(new_p) - logger.info(f"Expand vocab from {len(source_spm_tokens)} to {len(source_spm.pieces)}") - - # Save - os.makedirs(target_tokenizer_dir) - target_tokenizer_model_path = os.path.join(target_tokenizer_dir, "tokenizer.model") - with open(file=target_tokenizer_model_path, mode="wb") as fp: - fp.write(source_spm.SerializeToString()) - - target_tokenizer = LlamaTokenizer(vocab_file=target_tokenizer_model_path) - target_tokenizer.save_pretrained(save_directory=target_tokenizer_dir) - logger.info(f"Successfully save expand tokenizer to {target_tokenizer_dir}") - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument( - "--source_tokenizer_dir", type=str, required=True, default=None, help="Source tokenizer directory" - ) - parser.add_argument( - "--target_tokenizer_dir", type=str, required=True, default=None, help="Target tokenizer directory" - ) - parser.add_argument( - "--expand_tokens_file", - type=str, - required=True, - default=None, - help="Path of the file containing tokens to be extended", - ) - args = parser.parse_args() - - expand_tokens = [] - with open(file=args.expand_tokens_file, mode="r", encoding="utf-8") as fp_reader: - for line in fp_reader: - item = json.loads(line) - # e.g., {"piece": "你好"} - token = item["piece"] - if token in expand_tokens: - continue - expand_tokens.append(token) - expand_tokens.sort(key=lambda t: len(t), reverse=False) - - expand_vocab_tokenizer( - source_tokenizer_dir=args.source_tokenizer_dir, - target_tokenizer_dir=args.target_tokenizer_dir, - new_tokens=expand_tokens, - ) - - -if __name__ == "__main__": - main() diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/__init__.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/__init__.py deleted file mode 100644 index 56fafa58b3f43decb7699b93048b8b87e0f695aa..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/ckpt_io.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/ckpt_io.py deleted file mode 100644 index 05342ce41a60fc5509ee3741f448b28c9f934300..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/ckpt_io.py +++ /dev/null @@ -1,88 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -Helper functions for IO -""" - -import json -import os -from typing import Any, Dict, Tuple, Union - -import torch -from torch.optim.lr_scheduler import _LRScheduler -from torch.optim.optimizer import Optimizer - -from colossalai.booster import Booster -from colossalai.cluster import DistCoordinator - - -def load_json(file_path: Union[str, os.PathLike]) -> Dict[str, Any]: - """ - Load file in JSON format - """ - with open(file=file_path, mode="r", encoding="utf-8") as fp: - return json.load(fp) - - -def save_json(data: Dict[str, Any], file_path: Union[str, os.PathLike]) -> None: - """ - Save as JSON format - """ - with open(file=file_path, mode="w", encoding="utf-8") as fp: - json.dump(data, fp=fp, ensure_ascii=False, indent=4) - - -def save_checkpoint( - save_dir: Union[str, os.PathLike], - booster: Booster, - model: torch.nn.Module, - optimizer: Optimizer, - lr_scheduler: _LRScheduler, - epoch: int, - step: int, - batch_size: int, - coordinator: DistCoordinator, -) -> None: - """ - Save model checkpoint, optimizer, LR scheduler and intermedidate running states. - """ - - save_dir = os.path.join(save_dir, f"epoch-{epoch}_step-{step}") - os.makedirs(os.path.join(save_dir, "modeling"), exist_ok=True) - - booster.save_model(model, os.path.join(save_dir, "modeling"), shard=True) - - booster.save_optimizer(optimizer, os.path.join(save_dir, "optimizer"), shard=True) - booster.save_lr_scheduler(lr_scheduler, os.path.join(save_dir, "lr_scheduler")) - running_states = { - "epoch": epoch, - "step": step, - "sample_start_index": step * batch_size, - } - if coordinator.is_master(): - save_json(running_states, os.path.join(save_dir, "running_states.json")) - - -def load_checkpoint( - load_dir: Union[str, os.PathLike], - booster: Booster, - model: torch.nn.Module, - optimizer: Optimizer, - lr_scheduler: _LRScheduler, -) -> Tuple[int, int, int]: - """ - Load model checkpoint, optimizer, LR scheduler and intermedidate running states. - """ - - # Update booster params states. - booster.load_model(model=model, checkpoint=os.path.join(load_dir, "modeling")) - booster.load_optimizer(optimizer=optimizer, checkpoint=os.path.join(load_dir, "optimizer")) - booster.load_lr_scheduler(lr_scheduler=lr_scheduler, checkpoint=os.path.join(load_dir, "lr_scheduler")) - - running_states = load_json(file_path=os.path.join(load_dir, "running_states.json")) - return ( - running_states["epoch"], - running_states["step"], - running_states["sample_start_index"], - ) diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/froze.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/froze.py deleted file mode 100644 index 82677160d868301b357f83241fd4ae1592d0b841..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/froze.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -from transformers.models.llama import LlamaForCausalLM - - -def freeze_non_embeds_parameters(model: LlamaForCausalLM) -> None: - """Freeze all parameters except embeddings.""" - for name, params in model.named_parameters(): - if "embed_tokens" not in name and "lm_head" not in name: - params.requires_grad = False - else: - params.requires_grad = True - - -def unfreeze_parameters(model: LlamaForCausalLM) -> None: - for name, params in model.named_parameters(): - params.requires_grad = False diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/neftune_patch.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/neftune_patch.py deleted file mode 100644 index 21d769f3c49f9fe5cb5fc46d3c04157a105f152a..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/neftune_patch.py +++ /dev/null @@ -1,72 +0,0 @@ -# Copyright 2023 The Hugging Face team -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import torch - - -def unwrap(model): - if hasattr(model, "module"): - return model.unwrap() - else: - return model - - -def neftune_post_forward_hook(module, input, output): - """ - Implements the NEFTune forward pass for the model using forward hooks. Note this works only for torch.nn.Embedding - layers. This method is slightly adapted from the original source code that can be found here: - https://github.com/neelsjain/NEFTune Simply add it to your model as follows: - ```python - model = ... - model.embed_tokens.neftune_noise_alpha = 0.1 - model.embed_tokens.register_forward_hook(neftune_post_forward_hook) - ``` - Args: - module (`torch.nn.Module`): - The embedding module where the hook is attached. Note that you need to set `module.neftune_noise_alpha` to - the desired noise alpha value. - input (`torch.Tensor`): - The input tensor to the model. - output (`torch.Tensor`): - The output tensor of the model (i.e. the embeddings). - """ - if module.training: - dims = torch.tensor(output.size(1) * output.size(2)) - mag_norm = module.neftune_noise_alpha / torch.sqrt(dims) - output = output + torch.zeros_like(output).uniform_(-mag_norm, mag_norm) - return output - - -def activate_neftune(model, neftune_noise_alpha=0.1): - r""" - Activates the neftune as presented in this code: https://github.com/neelsjain/NEFTune and paper: - https://arxiv.org/abs/2310.05914 - """ - embeddings = unwrap(model).get_input_embeddings() - - embeddings.neftune_noise_alpha = neftune_noise_alpha - hook_handle = embeddings.register_forward_hook(neftune_post_forward_hook) - neftune_hook_handle = hook_handle - - return model, neftune_hook_handle - - -def deactivate_neftune(model, neftune_hook_handle): - """ - Deactivates the neftune method. Make sure to call `_activate_neftune` first. - """ - embeddings = unwrap(model).get_input_embeddings() - - neftune_hook_handle.remove() - del embeddings.neftune_noise_alpha diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/stream_chat_patch.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/stream_chat_patch.py deleted file mode 100644 index 44fa3678d621c3103ebf22413fdd98188c67d183..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/stream_chat_patch.py +++ /dev/null @@ -1,252 +0,0 @@ -from copy import deepcopy -from typing import Any, Callable, Dict, List, Optional, Tuple - -import torch -from torch import nn -from transformers import PreTrainedTokenizer -from transformers.generation.utils import GenerationConfig, LogitsProcessorList, StoppingCriteriaList -from transformers.utils import logging - -logger = logging.get_logger(__name__) - - -def get_prompt_template( - input_query: str, - history: List[Dict] = None, - roles: list = ["", "Human", "Assistant"], -) -> str: - """ - Generates a prompt template for chat models based on input and history. - - Args: - input_query (str): User's current input query. - history (List[Dict], optional): List of past conversations, each a dict with 'role' and 'message'. - roles (list): Specifies the roles in the conversation, defaults to ["", "Human", "Assistant"]. - - Returns: - str: A formatted prompt including the input query and history. - """ - prompt = "" - if history is None: - new_history = [] - else: - new_history = deepcopy(history) - - new_history.append({"role": roles[1], "message": input_query.strip()}) - new_history.append({"role": roles[2], "message": None}) - - for _, item in enumerate(new_history): - role = item.get("role") - message = item.get("message") - if role == roles[0]: - prompt += f"{message}\n\n" - else: - if message: - prompt += f"{role}: {message}" - else: - prompt += f"{role}: " - return prompt - - -@torch.inference_mode() -def streaming_chat( - model: Any, - tokenizer: PreTrainedTokenizer, - input_query: str, - history: List[Dict] = None, - roles: list = ["", "Human", "Assistant"], - past_key_values: Tuple[Tuple[torch.FloatTensor, Any], Any] = None, - temperature: float = 0.8, - top_p: float = 0.95, - top_k: int = 50, - do_sample: bool = True, - length_penalty: float = 1.2, - max_new_tokens: int = 512, - logits_processor: LogitsProcessorList = None, - return_past_key_values: bool = False, - **kwargs, -): - """ - Streaming chat responses generation with a given model and tokenizer. - - Args: - model (Any): The language model to generate responses. - tokenizer (PreTrainedTokenizer): Tokenizer compatible with the model, used for encoding inputs and decoding responses. - input_query (str): The current user input to respond to. - history (List[Dict], optional): A list of past conversations, where each conversation is a dictionary with keys 'role' and 'message'. - roles (list): Roles involved in the conversation, defaults to ["", "Human", "Assistant"]. - past_key_values (Tuple[Tuple[torch.FloatTensor, Any], Any], optional): Past key values for incremental decoding. - temperature (float): The temperature value for token sampling, defaults to 0.8. - top_p (float): Nucleus sampling probability threshold, defaults to 0.95. - top_k (int): Top-K filtering threshold, defaults to 50. - do_sample (bool): Whether to sample responses, defaults to True. - length_penalty (float): Penalty for response length, defaults to 1.2. - max_new_tokens (int): Maximum number of new tokens to generate, defaults to 512. - logits_processor (LogitsProcessorList, optional): Custom logits processors, defaults to None. - return_past_key_values (bool): Whether to return past key values for further incremental decoding, defaults to False. - **kwargs: Additional keyword arguments for generation. - - Yields: - Tuple[str, List[Dict], Optional[Tuple[Tuple[torch.FloatTensor, Any], Any]]]: A tuple containing the generated response, updated history, and - optionally the updated past key values if `return_past_key_values` is True. - - Ensures padding is on the left side for the tokenizer. - """ - assert tokenizer.padding_side == "left", "Current generation only supports left padding." - if history is None: - history = [] - if logits_processor is None: - logits_processor = LogitsProcessorList() - - generation_kwargs = { - "temperature": temperature, - "top_p": top_p, - "top_k": top_k, - "do_sample": do_sample, - "max_new_tokens": max_new_tokens, - "length_penalty": length_penalty, - "use_cache": True, - **kwargs, - } - - prompt_str = get_prompt_template(input_query, history=history, roles=roles) - - eos_token_id = [tokenizer.eos_token_id] - inputs = tokenizer(prompt_str, return_tensors="pt").to(model.device) - history.append({"role": roles[1], "message": input_query.strip()}) - history.append({"role": roles[2], "message": None}) - - for outputs in stream_generate( - model, - **inputs, - past_key_values=past_key_values, - eos_token_id=eos_token_id, - return_past_key_values=return_past_key_values, - **generation_kwargs, - ): - if return_past_key_values: - outputs, past_key_values = outputs - - outputs = outputs.tolist()[0][len(inputs["input_ids"][0]) : -1] - response = tokenizer.decode(outputs) - - history[-1]["message"] = response.strip() - if return_past_key_values: - yield response, history, past_key_values - else: - yield response, history - - -@torch.inference_mode() -def stream_generate( - model: Any, - input_ids: torch.Tensor, - generation_config: Optional[GenerationConfig] = None, - logits_processor: Optional[LogitsProcessorList] = None, - stopping_criteria: Optional[StoppingCriteriaList] = None, - prefix_allowed_tokens_fn: Optional[Callable[[int, torch.Tensor], List[int]]] = None, - return_past_key_values: bool = False, - **kwargs, -): - """ - Generates sequences of token ids using the specified model and generation parameters. - Adapted from https://huggingface.co/THUDM/chatglm3-6b/blob/main/modeling_chatglm.py - - Args: - model (Any): The model used for generating sequences of token ids. - input_ids (torch.Tensor): The sequence used as a prompt for the generation or as model inputs to the encoder. - generation_config (Optional[GenerationConfig]): The generation configuration to be used as base parametrization for the generation call. - logits_processor (Optional[LogitsProcessorList]): Custom logits processors that complement the default logits processors built from arguments - and generation config. - stopping_criteria (Optional[StoppingCriteriaList]): Custom stopping criteria that complement the default stopping criteria built from arguments - and a generation config. - prefix_allowed_tokens_fn (Optional[Callable[[int, torch.Tensor], List[int]]]): Function to constrain token generation. - return_past_key_values (bool): Whether to return past key values for further incremental decoding, defaults to False. - **kwargs: Additional parameters for model generation. - - Yields: - torch.Tensor: The generated token IDs, updated after each generation step. - Optional[Tuple[Tuple[torch.FloatTensor, Any], Any]]: The past key values, returned if `return_past_key_values` is True, defaults to False. - """ - input_ids_len = input_ids.size(1) - - if generation_config is None: - generation_config = model.generation_config - generation_config = deepcopy(generation_config) - model_kwargs = generation_config.update(**kwargs) - - eos_token_id = generation_config.eos_token_id - if isinstance(eos_token_id, int): - eos_token_id = [eos_token_id] - eos_token_id_tensor = torch.tensor(eos_token_id).to(input_ids.device) if eos_token_id is not None else None - - if generation_config.max_new_tokens is not None: - generation_config.max_length = generation_config.max_new_tokens + input_ids_len - - if input_ids_len >= generation_config.max_length: - input_ids_string = "decoder_input_ids" if model.config.is_encoder_decoder else "input_ids" - logger.warning( - f"Input length of {input_ids_string} is {input_ids_len}, but `max_length` is set to" - f" {generation_config.max_length}. This can lead to unexpected behavior. You should consider" - " increasing `max_new_tokens`." - ) - logits_processor = logits_processor if logits_processor is not None else LogitsProcessorList() - stopping_criteria = stopping_criteria if stopping_criteria is not None else StoppingCriteriaList() - - # prepare distribution pre_processing samplers - logits_processor = model._get_logits_processor( - generation_config=generation_config, - input_ids_seq_length=input_ids_len, - encoder_input_ids=input_ids, - prefix_allowed_tokens_fn=prefix_allowed_tokens_fn, - logits_processor=logits_processor, - ) - - # prepare stopping criteria - stopping_criteria = model._get_stopping_criteria( - generation_config=generation_config, stopping_criteria=stopping_criteria - ) - - logits_warper = model._get_logits_warper(generation_config) - unfinished_sequences = input_ids.new(input_ids.shape[0]).fill_(1) - scores = None - - while True: - model_inputs = model.prepare_inputs_for_generation(input_ids, **model_kwargs) - # forward pass to get next token - outputs = model( - **model_inputs, - return_dict=True, - output_attentions=False, - output_hidden_states=False, - ) - - # NOTE: this is correct only in left padding mode - # pre-process distribution - next_token_logits = outputs.logits[:, -1, :] - next_token_scores = logits_processor(input_ids, next_token_logits) - next_token_scores = logits_warper(input_ids, next_token_scores) - - # sample - probs = nn.functional.softmax(next_token_scores, dim=-1) - if generation_config.do_sample: - next_tokens = torch.multinomial(probs, num_samples=1).squeeze(1) - else: - next_tokens = torch.argmax(probs, dim=-1) - - # update generated ids, model inputs, and length for next step - input_ids = torch.cat([input_ids, next_tokens[:, None]], dim=-1) - model_kwargs = model._update_model_kwargs_for_generation( - outputs, model_kwargs, is_encoder_decoder=model.config.is_encoder_decoder - ) - unfinished_sequences = unfinished_sequences.mul( - next_tokens.tile(eos_token_id_tensor.shape[0], 1).ne(eos_token_id_tensor.unsqueeze(1)).prod(dim=0) - ) - - if return_past_key_values: - yield input_ids, outputs.past_key_values - else: - yield input_ids - # stop when each sentence is finished, or if exceed the maximum length - if unfinished_sequences.max() == 0 or stopping_criteria(input_ids, scores): - break diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/utils.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/utils.py deleted file mode 100644 index f24ab72c47c923674b6e55d340cebbdac3c1caaa..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/colossal_llama/utils/utils.py +++ /dev/null @@ -1,36 +0,0 @@ -""" -Utils for Colossal-LLaMA -""" - -import torch -import torch.distributed as dist - -from colossalai.booster import Plugin - - -def all_reduce_mean(tensor: torch.Tensor, plugin: Plugin = None) -> torch.Tensor: - if plugin is not None: - dist.all_reduce(tensor=tensor, op=dist.ReduceOp.SUM, group=plugin.dp_group) - tensor.div_(plugin.dp_size) - else: - dist.all_reduce(tensor=tensor, op=dist.ReduceOp.SUM) - tensor.div_(dist.get_world_size()) - return tensor - - -def get_model_numel(model: torch.nn.Module) -> int: - return sum(p.numel() for p in model.parameters()) - - -def format_numel_str(numel: int) -> str: - B = 1024**3 - M = 1024**2 - K = 1024 - if numel >= B: - return f"{numel / B:.2f} B" - elif numel >= M: - return f"{numel / M:.2f} M" - elif numel >= K: - return f"{numel / K:.2f} K" - else: - return f"{numel}" diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/dataset/convert_data.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/dataset/convert_data.py deleted file mode 100644 index 3ede43c95dd045b1d40bcf23bac7963c0f58a5a2..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/dataset/convert_data.py +++ /dev/null @@ -1,37 +0,0 @@ -# Copyright (c) 2024, Shanghai Iluvatar CoreX Semiconductor Co., Ltd. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import json - -with open('dataset/school_math/school_math_0.25M.jsonl', 'r', encoding='utf-8') as file: - lines=file.readlines() - -res_datas=[] -for line in lines: - data=json.loads(line.strip()) - human_content=data["conversation"][0]["human"] - assistant_content=data["conversation"][0]["assistant"] - - Res_data={"messages": [{"from": "human", "content": human_content}, {"from": "assistant", "content": assistant_content}]} - - res_datas.append(Res_data) - # print(Res_data) - if len(res_datas) > 10000: - break - -with open('dataset/school_math/convert/school_math_0.25M_convert.jsonl', 'w', encoding='utf-8') as file: - for res_data in res_datas: - file.write(json.dumps(res_data, ensure_ascii=False)+'\n') - - diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/dataset/prepare_pretrain_dataset.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/dataset/prepare_pretrain_dataset.py deleted file mode 100644 index d57143f0cde3e012c1ec930f0f85c8d5b7c26a5a..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/dataset/prepare_pretrain_dataset.py +++ /dev/null @@ -1,155 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# Copyright (c) 2024, Shanghai Iluvatar CoreX Semiconductor Co., Ltd. -# All Rights Reserved. - -""" -Prepare dataset for continual pre-training -""" - -import argparse -import json -import math -import os -import time -from multiprocessing import cpu_count - -import sys -sys.path.append(os.path.join(os.path.dirname(__file__), "../")) -from colossal_llama.dataset.spliced_and_tokenized_dataset import ( - ClosedToConstantLengthSplicedDataset, - supervised_tokenize_pretrain, - supervised_tokenize_pretrain_webtext -) -from datasets import dataset_dict, load_dataset -from transformers import AutoTokenizer - -from colossalai.logging import get_dist_logger - -logger = get_dist_logger() - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument( - "--data_input_dirs", - type=str, - required=True, - default=None, - help="Comma(i.e., ',') separated list of all data directories containing `.jsonl` data files.", - ) - parser.add_argument( - "--tokenizer_dir", type=str, required=True, default=None, help="A directory containing the tokenizer" - ) - parser.add_argument("--data_output_dirs", type=str, default="data_output_dirs", help="Data output directory") - parser.add_argument("--max_length", type=int, default=8192, help="Max length of each spliced tokenized sequence") - parser.add_argument("--num_spliced_dataset_bins", type=int, default=10, help="Number of spliced dataset bins") - parser.add_argument("--dataset_type", type=str, default="webtext", help="dataset type") - args = parser.parse_args() - - if args.num_spliced_dataset_bins >= 100000: - raise ValueError("Too many spliced divisions, must be smaller than 100000") - - args.data_cache_dir = os.path.join(args.data_output_dirs, "cache") - args.data_jsonl_output_dir = os.path.join(args.data_output_dirs, "jsonl") - args.data_arrow_output_dir = os.path.join(args.data_output_dirs, "arrow") - - if not os.path.exists(args.data_cache_dir): - os.makedirs(args.data_cache_dir) - if not os.path.exists(args.data_jsonl_output_dir): - os.makedirs(args.data_jsonl_output_dir) - if not os.path.exists(args.data_arrow_output_dir): - os.makedirs(args.data_arrow_output_dir) - - # Prepare to all input datasets - input_data_paths = [] - input_data_dirs = args.data_input_dirs.split(",") - for ds_dir in input_data_dirs: - ds_dir = os.path.abspath(ds_dir) - assert os.path.exists(ds_dir), f"Not find data dir {ds_dir}" - ds_files = [name for name in os.listdir(ds_dir) if name.endswith(".jsonl")] - ds_paths = [os.path.join(ds_dir, name) for name in ds_files] - input_data_paths.extend(ds_paths) - - # Prepare to data splitting. - train_splits = [] - split_interval = math.ceil(100 / args.num_spliced_dataset_bins) - for i in range(0, 100, split_interval): - start = i - end = i + split_interval - if end > 100: - end = 100 - train_splits.append(f"train[{start}%:{end}%]") - - # Prepare to the tokenizer. - tokenizer = AutoTokenizer.from_pretrained(args.tokenizer_dir) - tokenizer.add_bos_token = False - tokenizer.add_eos_token = False - if tokenizer.pad_token is None: - tokenizer.pad_token = tokenizer.unk_token - - list_dataset = load_dataset( - path="json", - data_files=input_data_paths, - cache_dir=os.path.join(args.data_cache_dir, "raw"), - keep_in_memory=False, - split=train_splits, - num_proc=cpu_count(), - ) - for index, dataset in enumerate(list_dataset): - assert isinstance(dataset, dataset_dict.Dataset) - logger.info(f"Start to process part-{index}/{len(list_dataset)} of all original datasets.") - dataset = dataset.map( - function=supervised_tokenize_pretrain_webtext if args.dataset_type =="webtext" else supervised_tokenize_pretrain, - fn_kwargs={"tokenizer": tokenizer, "max_length": args.max_length}, - keep_in_memory=False, - num_proc=min(len(dataset), cpu_count()), - ) - if args.dataset_type =="webtext": - dataset = dataset.remove_columns(column_names=["id", "text", "length", "ended"]) - dataset = dataset.sort(column_names=("seq_length"), reverse=False, keep_in_memory=False) - dataset = dataset.remove_columns(column_names=["seq_length"]) - else: - dataset = dataset.remove_columns(column_names=["source", "target", "category"]) - dataset = dataset.sort(column_names=("seq_category", "seq_length"), reverse=False, keep_in_memory=False) - dataset = dataset.remove_columns(column_names=["seq_category", "seq_length"]) - spliced_dataset = ClosedToConstantLengthSplicedDataset( - dataset=dataset, tokenizer=tokenizer, max_length=args.max_length, error_strict=False - ) - # Save each jsonl spliced dataset. - output_index = "0" * (5 - len(str(index))) + str(index) - output_name = f"part-{output_index}" - output_jsonl_path = os.path.join(args.data_jsonl_output_dir, output_name + ".jsonl") - st = time.time() - with open(file=output_jsonl_path, mode="w", encoding="utf-8") as fp_writer: - spliced_count = 0 - for spliced_data_point in spliced_dataset: - if spliced_count % 500 == 0: - logger.info(f"processing {spliced_count} spliced data points for {fp_writer.name}") - spliced_count += 1 - fp_writer.write(json.dumps(spliced_data_point, ensure_ascii=False) + "\n") - logger.info( - f"Current file {fp_writer.name}; " - f"Data size: {len(spliced_dataset)}; " - f"Spliced data size: {spliced_dataset.current_size}; " - f"Splicing compression rate: {round(spliced_dataset.current_size / len(spliced_dataset), 6)}; " - f"Time cost: {round((time.time() - st) / 60, 6)} minutes." - ) - - # Save each arrow spliced dataset - output_arrow_path = os.path.join(args.data_arrow_output_dir, output_name) - logger.info(f"Start to save {output_arrow_path}") - spliced_dataset = load_dataset( - path="json", - data_files=[output_jsonl_path], - cache_dir=os.path.join(args.data_cache_dir, "spliced_and_tokenized"), - keep_in_memory=False, - num_proc=cpu_count(), - split="train", - ) - spliced_dataset.save_to_disk(dataset_path=output_arrow_path, num_proc=min(len(spliced_dataset), cpu_count())) - - -if __name__ == "__main__": - main() diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/dataset/prepare_sft_dataset.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/dataset/prepare_sft_dataset.py deleted file mode 100644 index c6efb8a6490b12ad5ecd557f99a3341be2953be5..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/dataset/prepare_sft_dataset.py +++ /dev/null @@ -1,154 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# Copyright (c) 2024, Shanghai Iluvatar CoreX Semiconductor Co., Ltd. -# All Rights Reserved. - -""" -Prepare sft dataset for fine-tuning -""" - -import argparse -import json -import math -import os -from multiprocessing import cpu_count -import sys -sys.path.append(os.path.join(os.path.dirname(__file__), "../")) -from colossal_llama.dataset.conversation import LLaMA2_Conv, LLaMA3_Conv -from colossal_llama.dataset.spliced_and_tokenized_dataset import supervised_tokenize_sft -from datasets import dataset_dict, load_dataset -from transformers import AddedToken, AutoTokenizer - -from colossalai.logging import get_dist_logger - -logger = get_dist_logger() - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument( - "--data_input_dirs", - type=str, - required=True, - default=None, - help="Comma(i.e., ',') separated list of all data directories containing `.jsonl` data files.", - ) - parser.add_argument( - "--tokenizer_dir", type=str, required=True, default=None, help="A directory containing the tokenizer" - ) - parser.add_argument("--data_output_dirs", type=str, default="data_output_dirs", help="Data output directory") - parser.add_argument("--max_length", type=int, default=8192, help="Max length of each spliced tokenized sequence") - parser.add_argument("--num_spliced_dataset_bins", type=int, default=10, help="Number of spliced dataset bins") - parser.add_argument("--llama_version", type=int, default=3, help="LLaMA version") - args = parser.parse_args() - - if args.num_spliced_dataset_bins >= 100000: - raise ValueError("Too many spliced divisions, must be smaller than 100000") - - args.data_cache_dir = os.path.join(args.data_output_dirs, "cache") - args.data_jsonl_output_dir = os.path.join(args.data_output_dirs, "jsonl") - args.data_arrow_output_dir = os.path.join(args.data_output_dirs, "arrow") - - if not os.path.exists(args.data_cache_dir): - os.makedirs(args.data_cache_dir) - if not os.path.exists(args.data_jsonl_output_dir): - os.makedirs(args.data_jsonl_output_dir) - if not os.path.exists(args.data_arrow_output_dir): - os.makedirs(args.data_arrow_output_dir) - - # Prepare to all input datasets - input_data_paths = [] - input_data_dirs = args.data_input_dirs.split(",") - for ds_dir in input_data_dirs: - ds_dir = os.path.abspath(ds_dir) - assert os.path.exists(ds_dir), f"Not find data dir {ds_dir}" - ds_files = [name for name in os.listdir(ds_dir) if name.endswith(".jsonl")] - ds_paths = [os.path.join(ds_dir, name) for name in ds_files] - input_data_paths.extend(ds_paths) - - # Prepare to data splitting. - train_splits = [] - split_interval = math.ceil(100 / args.num_spliced_dataset_bins) - for i in range(0, 100, split_interval): - start = i - end = i + split_interval - if end > 100: - end = 100 - train_splits.append(f"train[{start}%:{end}%]") - - # Prepare to the tokenizer. - tokenizer = AutoTokenizer.from_pretrained(args.tokenizer_dir) - - default_conversation = LLaMA3_Conv - - # Fix split issue: https://github.com/huggingface/transformers/issues/23833 - if args.llama_version == 2: - tokenizer.add_tokens(AddedToken("", normalized=False, special=True), special_tokens=True) - default_conversation = LLaMA2_Conv - - tokenizer.add_bos_token = False - tokenizer.add_eos_token = False - if tokenizer.pad_token is None: - if tokenizer.unk_token is not None: - tokenizer.pad_token = tokenizer.unk_token - else: - tokenizer.pad_token = tokenizer.eos_token - tokenizer.unk_token = tokenizer.eos_token - - list_dataset = load_dataset( - path="json", - data_files=input_data_paths, - cache_dir=os.path.join(args.data_cache_dir, "raw"), - keep_in_memory=False, - split=train_splits, - num_proc=cpu_count(), - ) - for index, dataset in enumerate(list_dataset): - assert isinstance(dataset, dataset_dict.Dataset) - logger.info(f"Start to process part-{index}/{len(list_dataset)} of all original datasets.") - dataset = dataset.map( - function=supervised_tokenize_sft, - fn_kwargs={ - "tokenizer": tokenizer, - "conversation_template": default_conversation, - "max_length": args.max_length, - }, - keep_in_memory=False, - num_proc=min(len(dataset), cpu_count()), - ) - - dataset = dataset.filter(lambda data: data["labels"] is not None) - dataset = dataset.sort(column_names=("seq_category", "seq_length"), reverse=False, keep_in_memory=False) - - # We don't concatenate data samples here. - spliced_dataset = dataset - # Save each jsonl spliced dataset. - output_index = "0" * (5 - len(str(index))) + str(index) - output_name = f"part-{output_index}" - output_jsonl_path = os.path.join(args.data_jsonl_output_dir, output_name + ".jsonl") - # st = time.time() - with open(file=output_jsonl_path, mode="w", encoding="utf-8") as fp_writer: - spliced_count = 0 - for spliced_data_point in spliced_dataset: - if spliced_count % 500 == 0: - logger.info(f"processing {spliced_count} spliced data points for {fp_writer.name}") - spliced_count += 1 - fp_writer.write(json.dumps(spliced_data_point, ensure_ascii=False) + "\n") - - # Save each arrow spliced dataset - output_arrow_path = os.path.join(args.data_arrow_output_dir, output_name) - logger.info(f"Start to save {output_arrow_path}") - spliced_dataset = load_dataset( - path="json", - data_files=[output_jsonl_path], - cache_dir=os.path.join(args.data_cache_dir, "spliced_and_tokenized"), - keep_in_memory=False, - num_proc=cpu_count(), - split="train", - ) - spliced_dataset.save_to_disk(dataset_path=output_arrow_path, num_proc=min(len(spliced_dataset), cpu_count())) - - -if __name__ == "__main__": - main() diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/get_Meta_LLaMA_8B.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/get_Meta_LLaMA_8B.py deleted file mode 100644 index 3143ff726c261695aa5f8cfe522c1a0455561e10..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/get_Meta_LLaMA_8B.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright (c) 2024, Shanghai Iluvatar CoreX Semiconductor Co., Ltd. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -#模型下载 -from modelscope import snapshot_download -model_dir = snapshot_download('LLM-Research/Meta-Llama-3-8B') \ No newline at end of file diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/performance_evaluator.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/performance_evaluator.py deleted file mode 100644 index 7ad3394a838790ba39447e98a3cf83e538b8650e..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/performance_evaluator.py +++ /dev/null @@ -1,153 +0,0 @@ -# Copyright (c) 2024, Shanghai Iluvatar CoreX Semiconductor Co., Ltd. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from time import time -from typing import Optional - -import torch -import torch.distributed as dist -from torch import Tensor - -from colossalai.accelerator import get_accelerator -from colossalai.cluster import DistCoordinator -from colossal_llama.utils import utils - -def divide(x: float, y: float) -> float: - if y == 0: - return float("inf") - elif y == float("inf"): - return float("nan") - return x / y - - -@torch.no_grad() -def all_reduce_mean(x: float, world_size: int) -> float: - if world_size == 1: - return x - tensor = torch.tensor([x], device=get_accelerator().get_current_device()) - dist.all_reduce(tensor) - tensor = tensor / world_size - return tensor.item() - - -class Timer: - def __init__(self) -> None: - self.start_time: Optional[float] = None - self.duration: float = 0.0 - - def start(self) -> None: - self.start_time = time() - - def end(self) -> None: - assert self.start_time is not None - self.duration = time() - self.start_time - self.start_time = None - - def reset(self) -> None: - self.duration = 0.0 - - -class PerformanceEvaluator: - """ - Callback for valuate the performance of the model. - Args: - actor_num_params: The number of parameters of the actor model. - critic_num_params: The number of parameters of the critic model. - initial_model_num_params: The number of parameters of the initial model. - reward_model_num_params: The number of parameters of the reward model. - enable_grad_checkpoint: Whether to enable gradient checkpointing. - ignore_episodes: The number of episodes to ignore when calculating the performance. - """ - - def __init__( - self, - model_numel: int, - num_layers: int, - hidden_size: int, - vocab_size: int, - enable_grad_checkpoint: bool = False, - ignore_steps: int = 0, - dp_world_size: Optional[int] = None, - ) -> None: - self.model_numel = model_numel - self.enable_grad_checkpoint = enable_grad_checkpoint - self.ignore_steps = ignore_steps - self.num_layers = num_layers - self.hidden_size = hidden_size - self.vocab_size = vocab_size - - self.coordinator = DistCoordinator() - self.dp_world_size = dp_world_size or self.coordinator.world_size - self.disable: bool = False - self.timer = Timer() - self.num_samples: int = 0 - self.flop_megatron = 0 - self.flop: int = 0 - self.tokens_per_second_per_devices = [] - self.avg_tflops_per_gpus = [] - - def on_step_start(self, step: int) -> None: - self.disable = self.ignore_steps > 0 and step < self.ignore_steps - self.step = step - # if self.disable: - # return - get_accelerator().synchronize() - self.timer.start() - - def on_step_end(self, loss, inputs_size, plugin, **kwargs) -> None: - # if self.disable: - # return - get_accelerator().synchronize() - self.timer.end() - - batch_size, seq_len = inputs_size - - self.num_samples = batch_size - checkpoint_activations_factor = 3 + int(self.enable_grad_checkpoint) - self.flop_megatron = ( - 24 * checkpoint_activations_factor * batch_size * seq_len * self.num_layers * (self.hidden_size**2) - ) * ( - 1.0 + (seq_len / (6.0 * self.hidden_size)) + (self.vocab_size / (16.0 * self.num_layers * self.hidden_size)) - ) - self.flop = batch_size * seq_len * self.model_numel * 2 * (3 + int(self.enable_grad_checkpoint)) - - # def on_fit_end(self) -> None: - avg_duration = all_reduce_mean(self.timer.duration, self.coordinator.world_size) - avg_throughput = self.num_samples * self.dp_world_size / (avg_duration + 1e-12) - tokens_per_second_per_device = avg_throughput * seq_len * 2 / self.coordinator.world_size ## BI-V150 one device has two gpus - mp_world_size = self.coordinator.world_size // self.dp_world_size - avg_tflops_per_gpu_megatron = self.flop_megatron / 1e12 / (avg_duration + 1e-12) / mp_world_size - avg_tflops_per_gpu = self.flop / 1e12 / (avg_duration + 1e-12) / mp_world_size - - global_loss = None - if plugin.stage_manager.is_last_stage(): - global_loss = utils.all_reduce_mean(loss, plugin) - - - self.coordinator.print_on_last_process( - f"num_samples: {self.num_samples}, dp_world_size: {self.dp_world_size}, flop_megatron: {self.flop_megatron}, flop: {self.flop}, avg_duration: {avg_duration}, " - ) - self.coordinator.print_on_last_process( - f"loss:{global_loss}, Throughput: {avg_throughput:.2f} samples/sec , tokens_per_second_per_device: {tokens_per_second_per_device} , TFLOPS per GPU by Megatron: {avg_tflops_per_gpu_megatron:.2f} , TFLOPS per GPU: {avg_tflops_per_gpu:.2f}" - ) - - if self.step >= self.ignore_steps and self.step < self.ignore_steps + 5: - if self.step == self.ignore_steps + 4: - self.coordinator.print_on_last_process("\n ---------------------------------------------" + - f"\n average values of [{self.ignore_steps} - {self.ignore_steps + 5}) steps, tokens_per_second_per_device: {sum(self.tokens_per_second_per_devices)/len(self.tokens_per_second_per_devices):.2f} , TFLOPS per GPU: {sum(self.avg_tflops_per_gpus)/len(self.avg_tflops_per_gpus):.2f} " + - "\n ---------------------------------------------") - else: - self.tokens_per_second_per_devices.append(tokens_per_second_per_device) - self.avg_tflops_per_gpus.append(avg_tflops_per_gpu) \ No newline at end of file diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/prepare_sft_dataset.sh b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/prepare_sft_dataset.sh deleted file mode 100644 index dd1ce0c4eca06def8b7129439858ab00ff0540ce..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/prepare_sft_dataset.sh +++ /dev/null @@ -1,55 +0,0 @@ -#!/bin/bash - -# Copyright (c) 2024, Shanghai Iluvatar CoreX Semiconductor Co., Ltd. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# 本脚本可以带一个参数或者0个参数,指示llama版本,可为 "llama2" 或者 "llama3",如果无入参,则默认为llama2 - -set -euox pipefail -CUR_DIR=$(cd "$(dirname "$0")";pwd) -cd ${CUR_DIR} - -DATA_INPUT_DIRS=$CUR_DIR"/dataset/school_math/convert/" -mkdir -p $DATA_INPUT_DIRS - -python3 dataset/convert_data.py - -LLAMA_VER=${1:-"llama3"} -echo "LLaMA version:" $LLAMA_VER - -if [ $LLAMA_VER == "llama2" ]; then - # 代码中lable与input的错位需要,loss计算length为4096的sequence。 - MAX_LENGTH=4097 - TOKENIZER_DIR=/home/model_zoos/nlp/Llama-2-7b-hf - DATA_OUTPUT_DIRS=dataset/school_math/convert/llama2_data_sft - llama_ver=2 - -elif [ $LLAMA_VER == "llama3" ]; then - # 代码中lable与input的错位需要,loss计算length为8192的sequence。 - MAX_LENGTH=8193 - TOKENIZER_DIR=/home/model_zoos/nlp/Meta-Llama-3-8B - DATA_OUTPUT_DIRS=dataset/school_math/convert/llama3_data_sft - llama_ver=3 -else - echo "Error LLAMA_VER, please input correct LLaMA version" - exit 1 -fi - -python3 dataset/prepare_sft_dataset.py \ - --data_input_dirs $DATA_INPUT_DIRS \ - --data_output_dirs $DATA_OUTPUT_DIRS \ - --tokenizer_dir $TOKENIZER_DIR \ - --max_length $MAX_LENGTH \ - --llama_version $llama_ver diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/requirements.txt b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/requirements.txt deleted file mode 100644 index f02f85c6d2a3b9d61ed792e5b467858c13ed11fe..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/requirements.txt +++ /dev/null @@ -1,15 +0,0 @@ -# torch==2.1.2 -huggingface-hub -packaging==24.0 -colossalai>=0.4.0 -autoflake==2.2.1 -black==23.9.1 -transformers>=4.39.3 -tensorboard==2.14.0 -six==1.16.0 -datasets -ninja==1.11.1 -# flash-attn -tqdm -sentencepiece==0.1.99 -protobuf<=3.20.0 diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/run_llama3_8b_sft_3d.sh b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/run_llama3_8b_sft_3d.sh deleted file mode 100644 index bab2c5b7a5748d29a2dcb286d0410fc16ee7daaa..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/run_llama3_8b_sft_3d.sh +++ /dev/null @@ -1,62 +0,0 @@ -#!/bin/bash - -# Copyright (c) 2024, Shanghai Iluvatar CoreX Semiconductor Co., Ltd. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -PARENT_SAVE_DIR="checkpoint" -PARENT_TENSORBOARD_DIR="tensorboard" -PARENT_CONFIG_FILE="config" - -TIMESTAMP=$(date +%Y-%m-%d-%H-%M-%S) -LOG_DIR="logs/${TIMESTAMP}" -SAVE_DIR="${LOG_DIR}/${PARENT_SAVE_DIR}" -TENSORBOARD_DIR="${LOG_DIR}/${PARENT_TENSORBOARD_DIR}" -CONFIG_FILE="${LOG_DIR}/${PARENT_CONFIG_FILE}.json" - -DATASET_PATH=./dataset/school_math/convert/llama3_data_sft/arrow/ -TOKENIZER_DIR=/home/model_zoos/nlp/Meta-Llama-3-8B -GLOBAL_BATCH_SIZE_PER_DP=8 -MICRO_BATCH_SIZE=1 - - -mkdir -p $LOG_DIR -colossalai run --nproc_per_node 16 train.py \ - --config "llama3_8b" \ - --dataset $DATASET_PATH \ - --tokenizer_dir $TOKENIZER_DIR \ - --max_length 8192 \ - --plugin "3d" \ - --zero_stage 1 \ - --pp 4 \ - --custom_ckpt \ - --custom_recompute_layers_per_stage 7 6 5 6 \ - --ignore_steps 2 \ - --save_interval 0 \ - --save_dir $SAVE_DIR \ - --tensorboard_dir $TENSORBOARD_DIR \ - --config_file $CONFIG_FILE \ - --num_epochs 1 \ - --batch_size $GLOBAL_BATCH_SIZE_PER_DP \ - --microbatch_size $MICRO_BATCH_SIZE \ - --lr 1e-4 \ - --mixed_precision "bf16" \ - --grad_clip 1.0 \ - --weight_decay 0.01 \ - --warmup_steps 100 \ - --use_grad_checkpoint \ - --use_flash_attn \ - --use_neft \ - --pad_token "eos" |& tee ${LOG_DIR}/output.log - diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/setup.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/setup.py deleted file mode 100644 index c9ba3169821874b8f67b679a6047f0b178e9028a..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/setup.py +++ /dev/null @@ -1,37 +0,0 @@ -from setuptools import find_packages, setup - - -def fetch_requirements(path): - with open(path, "r") as fd: - return [r.strip() for r in fd.readlines()] - - -def fetch_readme(): - with open("README.md", encoding="utf-8") as f: - return f.read() - - -def fetch_version(): - with open("version.txt", "r") as f: - return f.read().strip() - - -setup( - name="colossal_llama", - version=fetch_version(), - packages=find_packages(exclude=("*.egg-info",)), - description="Continual Pre-training and SFT for LLaMA", - long_description=fetch_readme(), - long_description_content_type="text/markdown", - license="Apache Software License 2.0", - url="https://github.com/hpcaitech/ColossalAI/tree/main/applications/Colossal-LLaMA", - install_requires=fetch_requirements("requirements.txt"), - python_requires=">=3.7", - classifiers=[ - "Programming Language :: Python :: 3", - "License :: OSI Approved :: Apache Software License", - "Environment :: GPU :: NVIDIA CUDA", - "Topic :: Scientific/Engineering :: Artificial Intelligence", - "Topic :: System :: Distributed Computing", - ], -) diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/train.example.sh b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/train.example.sh deleted file mode 100644 index b795e8bcf810f0f37b751454459d3d01fb4bcbc4..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/train.example.sh +++ /dev/null @@ -1,52 +0,0 @@ -#!/bin/bash -set_n_least_used_CUDA_VISIBLE_DEVICES() { - local n=${1:-"9999"} - echo "GPU Memory Usage:" - local FIRST_N_GPU_IDS=$(nvidia-smi --query-gpu=memory.used --format=csv | - tail -n +2 | - nl -v 0 | - tee /dev/tty | - sort -g -k 2 | - awk '{print $1}' | - head -n $n) - export CUDA_VISIBLE_DEVICES=$(echo $FIRST_N_GPU_IDS | sed 's/ /,/g') - echo "Now CUDA_VISIBLE_DEVICES is set to:" - echo "CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES" -} - -set_n_least_used_CUDA_VISIBLE_DEVICES 8 - -PROJECT_NAME="" -PARENT_SAVE_DIR="" -PARENT_TENSORBOARD_DIR="" -PARENT_CONFIG_FILE="" -PRETRAINED_MODEL_PATH="" - -declare -a dataset=( - "PATH TO THE DATASET" -) - -TIMESTAMP=$(date +%Y-%m-%d-%H-%M-%S) -FULL_PROJECT_NAME="${PROJECT_NAME}-${TIMESTAMP}" -SAVE_DIR="${PARENT_SAVE_DIR}${FULL_PROJECT_NAME}" -TENSORBOARD_DIR="${PARENT_TENSORBOARD_DIR}${FULL_PROJECT_NAME}" -CONFIG_FILE="${PARENT_CONFIG_FILE}${FULL_PROJECT_NAME}.json" - -colossalai run --nproc_per_node 8 --hostfile hostfile --master_port 30013 train.py \ - --pretrained $PRETRAINED_MODEL_PATH \ - --dataset ${dataset[@]} \ - --plugin "zero2" \ - --save_interval 400 \ - --save_dir $SAVE_DIR \ - --tensorboard_dir $TENSORBOARD_DIR \ - --config_file $CONFIG_FILE \ - --num_epochs 1 \ - --micro_batch_size 8 \ - --lr 1e-4 \ - --mixed_precision "bf16" \ - --grad_clip 1.0 \ - --weight_decay 0.01 \ - --warmup_steps 100 \ - --use_grad_checkpoint \ - --use_flash_attn \ - --pad_token "unk" diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/train.py b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/train.py deleted file mode 100644 index b7fa1da4165a0ef29b53bfd140453bb514b69e29..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/train.py +++ /dev/null @@ -1,603 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# Copyright (c) 2024, Shanghai Iluvatar CoreX Semiconductor Co., Ltd. -# All Rights Reserved. - -""" -Continual Pre-training/Supervised fine-tuning of Colossal-LLaMA-2 developed by Colossal-AI Team -""" - -import argparse -import json -import os -import resource -from contextlib import nullcontext - -import torch -from colossal_llama.dataset.dummy_dataset import RandomDataset -from colossal_llama.dataset.loader import ( - DataCollatorForSupervisedDataset, - StatefulDistributedSampler, - load_tokenized_dataset, -) -from colossal_llama.utils.ckpt_io import load_checkpoint, save_checkpoint -from colossal_llama.utils.froze import freeze_non_embeds_parameters -from colossal_llama.utils.neftune_patch import activate_neftune, deactivate_neftune -from colossal_llama.utils.utils import all_reduce_mean, format_numel_str, get_model_numel -from torch.utils.tensorboard import SummaryWriter -from tqdm import tqdm -from transformers import AutoModelForCausalLM, AutoTokenizer - -import colossalai -from colossalai.accelerator import get_accelerator -from colossalai.booster import Booster -from colossalai.booster.plugin import GeminiPlugin, HybridParallelPlugin, LowLevelZeroPlugin, TorchDDPPlugin -from colossalai.cluster import DistCoordinator -from colossalai.lazy import LazyInitContext -from colossalai.nn.lr_scheduler import CosineAnnealingWarmupLR -from colossalai.nn.optimizer import HybridAdam -from colossalai.utils import get_current_device -from transformers.models.llama.configuration_llama import LlamaConfig -from transformers import AutoConfig, AutoModelForCausalLM -from colossalai.shardformer import PipelineGradientCheckpointConfig -from performance_evaluator import PerformanceEvaluator - -MODEL_CONFIGS = { - "7b": LlamaConfig(max_position_embeddings=4096), - "13b": LlamaConfig( - hidden_size=5120, - intermediate_size=13824, - num_hidden_layers=40, - num_attention_heads=40, - max_position_embeddings=4096, - ), - "70b": LlamaConfig( - hidden_size=8192, - intermediate_size=28672, - num_hidden_layers=80, - num_attention_heads=64, - max_position_embeddings=4096, - num_key_value_heads=8, - ), - "llama3_8b": LlamaConfig(max_position_embeddings=8192, - vocab_size=128256, - num_key_value_heads=8, - intermediate_size=14336, - rope_theta=500000), -} - -def train(args) -> None: - # ============================== - # Initialize Distributed Training - # ============================== - colossalai.launch_from_torch() - accelerator = get_accelerator() - coordinator = DistCoordinator() - - # ============================== - # Initialize Tensorboard and Save Config - # ============================== - if coordinator.is_master(): - os.makedirs(args.tensorboard_dir, exist_ok=True) - writer = SummaryWriter(args.tensorboard_dir) - - with open(args.config_file, "w") as f: - json.dump(args.__dict__, f, indent=4) - print(f"args:{args}") - # ============================== - # Initialize Booster - # ============================== - hybrid_kwargs = { - "gradient_checkpoint_config": PipelineGradientCheckpointConfig(num_ckpt_layers_per_stage=args.custom_recompute_layers_per_stage) if args.custom_ckpt else None, - "use_ixformer_mlp": args.use_ixformer_mlp, - "use_colo_llamaflashatten": args.use_colo_llamaflashatten, - "use_ixformer_fusedrmsnormres": args.use_ixformer_fusedrmsnormres, - } - if args.plugin == "ddp": - plugin = TorchDDPPlugin(find_unused_parameters=True if args.use_grad_checkpoint is False else False) - elif args.plugin == "gemini": - plugin = GeminiPlugin( - precision=args.mixed_precision, - initial_scale=2**16, - max_norm=args.grad_clip, - enable_gradient_accumulation=(args.accumulation_steps > 1), - enable_fused_normalization=torch.cuda.is_available(), - enable_flash_attention=args.use_flash_attn, - ) - elif args.plugin == "gemini_auto": - plugin = GeminiPlugin( - precision=args.mixed_precision, - placement_policy="auto", - initial_scale=2**16, - max_norm=args.grad_clip, - enable_gradient_accumulation=(args.accumulation_steps > 1), - enable_fused_normalization=torch.cuda.is_available(), - enable_flash_attention=args.use_flash_attn, - ) - elif args.plugin == "zero2": - plugin = LowLevelZeroPlugin( - stage=2, - precision=args.mixed_precision, - initial_scale=2**16, - max_norm=args.grad_clip, - ) - elif args.plugin == "zero2_cpu": - plugin = LowLevelZeroPlugin( - stage=2, - precision=args.mixed_precision, - initial_scale=2**16, - cpu_offload=True, - max_norm=args.grad_clip, - ) - elif args.plugin == "3d": - plugin = HybridParallelPlugin( - tp_size=args.tp, - pp_size=args.pp, - sp_size=args.sp, - sequence_parallelism_mode=args.sp_mode, - zero_stage=args.zero_stage, - enable_flash_attention=args.use_flash_attn, - enable_fused_normalization=torch.cuda.is_available() if "llama3" not in args.config else False, # ixformer 融合算子对于llama3尺寸暂时有问题,此处临时规避 - enable_sequence_parallelism=args.enable_sequence_parallelism, - cpu_offload=True if args.zero_stage >= 1 and args.zero_cpu_offload else False, - parallel_output=False, - max_norm=args.grad_clip, - precision=args.mixed_precision, - microbatch_size=args.microbatch_size, - **hybrid_kwargs, - ) - else: - raise ValueError(f"Unknown plugin {args.plugin}") - - booster = Booster(plugin=plugin) - - # ====================================================== - # Initialize Tokenizer, Dataset, Collator and Dataloader - # ====================================================== - tokenizer = AutoTokenizer.from_pretrained(args.tokenizer_dir) - if args.pad_token == "eos": - tokenizer.pad_token = tokenizer.eos_token - elif args.pad_token == "unk": - tokenizer.pad_token = tokenizer.unk_token - tokenizer.add_bos_token = False - tokenizer.add_eos_token = False - - coordinator.print_on_master( - f"Training Info:\nConfig file: {args.config_file} \nTensorboard logs: {args.tensorboard_dir} \nModel checkpoint: {args.save_dir}" - ) - - if args.benchmark: - coordinator.print_on_master(f"Run benchmark with {args.num_samples} random samples.") - dataset = RandomDataset( - num_samples=args.num_samples, max_length=args.max_length, vocab_size=tokenizer.vocab_size - ) - dataloader = plugin.prepare_dataloader( - dataset, - batch_size=args.batch_size, - shuffle=True, - drop_last=True, - seed=42, - distributed_sampler_cls=StatefulDistributedSampler, - ) - else: - coordinator.print_on_master(f"Load dataset: {args.dataset}") - dataset = load_tokenized_dataset(dataset_parrent_path=args.dataset, mode="train") - data_collator = DataCollatorForSupervisedDataset( - tokenizer=tokenizer, max_length=args.max_length, padding=args.padding_mode - ) - dataloader = plugin.prepare_dataloader( - dataset=dataset, - batch_size=args.batch_size, - num_workers=2, - shuffle=True, - drop_last=True, - collate_fn=data_collator, - distributed_sampler_cls=StatefulDistributedSampler, - ) - - coordinator.print_on_master( - f"Max device memory after data loader: {accelerator.max_memory_allocated() / 1024 ** 2:.2f} MB" - ) - - # ====================================================== - # Initialize Model, Objective, Optimizer and LR Scheduler - # ====================================================== - if args.config in MODEL_CONFIGS: - config = MODEL_CONFIGS[args.config] - else: - config = AutoConfig.from_pretrained(args.config, trust_remote_code=True) - - init_ctx = ( - LazyInitContext(default_device=get_current_device()) - if isinstance(plugin, (GeminiPlugin, HybridParallelPlugin)) - else nullcontext() - ) - with init_ctx: - if args.pretrained: - model = AutoModelForCausalLM.from_pretrained( - args.pretrained, - torch_dtype=torch.bfloat16 if args.mixed_precision == "bf16" else torch.float16, - trust_remote_code=True, - ) - else: - init_kwargs={} - if args.use_flash_attn or args.use_colo_llamaflashatten: - init_kwargs["attn_implementation"] = "flash_attention_2" - init_kwargs["torch_dtype"]=torch.bfloat16 if args.mixed_precision == "bf16" else torch.float16 - - model = AutoModelForCausalLM.from_config(config, - trust_remote_code=True, - **init_kwargs) - - # Freeze part of parameters. - if args.freeze_non_embeds_params: - freeze_non_embeds_parameters(model=model) - # this is essential, otherwise the grad checkpoint will not work. - model.train() - - if args.use_grad_checkpoint: - model.gradient_checkpointing_enable() - coordinator.print_on_master(msg="Gradient checkpointing enabled successfully") - - model_numel = get_model_numel(model) - coordinator.print_on_master(f"Model params: {format_numel_str(model_numel)}") - - optimizer = HybridAdam( - model_params=( - filter(lambda p: p.requires_grad, model.parameters()) - if args.freeze_non_embeds_params - else model.parameters() - ), - lr=args.lr, - betas=(0.9, 0.95), - weight_decay=args.weight_decay, - adamw_mode=True, - ) - - if args.warmup_steps is None: - args.warmup_steps = int(args.num_epochs * 0.025 * (len(dataloader) // args.accumulation_steps)) - coordinator.print_on_master(f"Warmup steps is set to {args.warmup_steps}") - - lr_scheduler = CosineAnnealingWarmupLR( - optimizer=optimizer, - total_steps=args.num_epochs * (len(dataloader) // args.accumulation_steps), - warmup_steps=args.warmup_steps, - eta_min=0.1 * args.lr, - ) - - # Flash attention will be disabled because it does NOT support fp32. - default_dtype = torch.float16 if args.mixed_precision == "fp16" else torch.bfloat16 - torch.set_default_dtype(default_dtype) - model, optimizer, _, dataloader, lr_scheduler = booster.boost( - model=model, - optimizer=optimizer, - lr_scheduler=lr_scheduler, - dataloader=dataloader, - ) - - torch.set_default_dtype(torch.float) - - coordinator.print_on_master( - f"Booster init max device memory: {accelerator.max_memory_allocated() / 1024 ** 2:.2f} MB" - ) - coordinator.print_on_master( - f"Booster init max CPU memory: {resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024:.2f} MB" - ) - - start_epoch = 0 - start_step = 0 - sampler_start_idx = 0 - if args.load_checkpoint is not None: - if "modeling" in args.load_checkpoint: - coordinator.print_on_master(f"Continued pretrain from checkpoint {args.load_checkpoint}") - booster.load_model(model, args.load_checkpoint) - else: - coordinator.print_on_master(f"Load model checkpoint from {args.load_checkpoint}") - start_epoch, start_step, sampler_start_idx = load_checkpoint( - load_dir=args.load_checkpoint, - booster=booster, - model=model, - optimizer=optimizer, - lr_scheduler=lr_scheduler, - ) - coordinator.print_on_master( - f"Loaded checkpoint {args.load_checkpoint} at epoch {start_epoch} step {start_step}" - ) - coordinator.print_on_master(f"Loaded sample at index {sampler_start_idx}") - - coordinator.print_on_master( - f"Checkpoint loaded max device memory: {accelerator.max_memory_allocated() / 1024 ** 2:.2f} MB" - ) - coordinator.print_on_master( - f"Checkpoint loaded device memory: {accelerator.memory_allocated() / 1024 ** 2:.2f} MB" - ) - coordinator.print_on_master( - f"Checkpoint loaded max CPU memory: {resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024:.2f} MB" - ) - - if args.use_neft: - coordinator.print_on_master("Activate NEFTune.") - model, handle = activate_neftune(model) - - dp_size = getattr(plugin, "dp_size", coordinator.world_size) - performance_evaluator = PerformanceEvaluator( - model_numel, - model.module.config.num_hidden_layers, - model.module.config.hidden_size, - model.module.config.vocab_size, - args.use_grad_checkpoint, - args.ignore_steps, - dp_world_size=dp_size, - ) - - num_steps_per_epoch = len(dataloader) // args.accumulation_steps - # If resume training, set the sampler start index to the correct value - assert isinstance(dataloader.sampler, StatefulDistributedSampler) - dataloader.sampler.set_start_index(start_index=sampler_start_idx) - - for epoch in range(start_epoch, args.num_epochs): - dataloader.sampler.set_epoch(epoch=epoch) - if isinstance(plugin, HybridParallelPlugin) and plugin.pp_size > 1: - data_iter = iter(dataloader) - step_bar = tqdm( - range(len(dataloader)), - desc="Step", - disable=not (coordinator._local_rank == coordinator._world_size - 1), - ) - with torch.autograd.profiler.profile(enabled=False) as prof: - for step in step_bar: - # if step > 7: - # break - performance_evaluator.on_step_start(step) - outputs = booster.execute_pipeline( - data_iter, - model, - criterion=lambda outputs, inputs: outputs[0], - optimizer=optimizer, - return_loss=True, - ) - loss = outputs["loss"] - optimizer.step() - optimizer.zero_grad() - performance_evaluator.on_step_end(loss, inputs_size = (args.batch_size, args.max_length), plugin=booster.plugin) - - # Save modeling. - save_model_condition = args.save_interval > 0 and (step + 1) % args.save_interval == 0 - - if not args.skip_save_each_epoch: - save_model_condition = save_model_condition or (step + 1) == len(dataloader) - - if save_model_condition and not args.benchmark: - coordinator.print_on_master("\nStart saving model checkpoint with running states") - - if args.use_neft: - coordinator.print_on_master("Deactivate NEFTune before saving model.") - deactivate_neftune(model, handle) - - accelerator.empty_cache() - save_checkpoint( - save_dir=args.save_dir, - booster=booster, - model=model, - optimizer=optimizer, - lr_scheduler=lr_scheduler, - epoch=epoch, - step=step + 1, - batch_size=args.batch_size, - coordinator=coordinator, - ) - coordinator.print_on_master( - f"Saved checkpoint at epoch {epoch} step {step + 1} at folder {args.save_dir}" - ) - - if args.use_neft: - coordinator.print_on_master("Activate NEFTune.") - model, handle = activate_neftune(model) - if prof: - prof.export_chrome_trace(f'torch_profile/rank{torch.distributed.get_rank()}_llama2_colo.json') - else: - pbar = tqdm( - desc=f"Epoch {epoch}", - disable=not coordinator.is_master(), - total=num_steps_per_epoch, - initial=start_step // args.accumulation_steps, - ) - total_loss = torch.tensor(0.0, device=get_current_device()) - for step, batch in enumerate(dataloader, start=start_step): - batch = {k: v.to(get_current_device()) for k, v in batch.items() if isinstance(v, torch.Tensor)} - - batch_output = model(**batch) - - loss = batch_output.loss / args.accumulation_steps - total_loss.add_(loss.data) - - booster.backward(loss=loss, optimizer=optimizer) - - if (step + 1) % args.accumulation_steps == 0: - optimizer.step() - lr_scheduler.step() - optimizer.zero_grad() - - all_reduce_mean(tensor=total_loss) - pbar.set_postfix({"Loss": f"{total_loss.item():.4f}"}) - if coordinator.is_master(): - global_step = (epoch * num_steps_per_epoch) + (step + 1) // args.accumulation_steps - writer.add_scalar(tag="Loss", scalar_value=total_loss.item(), global_step=global_step) - writer.add_scalar( - tag="Learning Rate", - scalar_value=lr_scheduler.get_last_lr()[0], - global_step=global_step, - ) - total_loss.fill_(0.0) - pbar.update() - - # Save modeling. - save_model_condition = ( - args.save_interval > 0 and (step + 1) % (args.save_interval * args.accumulation_steps) == 0 - ) - - if not args.skip_save_each_epoch: - save_model_condition = save_model_condition or (step + 1) == len(dataloader) - - if save_model_condition and not args.benchmark: - coordinator.print_on_master("\nStart saving model checkpoint with running states") - - if args.use_neft: - coordinator.print_on_master("Deactivate NEFTune before saving model.") - deactivate_neftune(model, handle) - - accelerator.empty_cache() - save_checkpoint( - save_dir=args.save_dir, - booster=booster, - model=model, - optimizer=optimizer, - lr_scheduler=lr_scheduler, - epoch=epoch, - step=step + 1, - batch_size=args.batch_size, - coordinator=coordinator, - ) - coordinator.print_on_master( - f"Saved checkpoint at epoch {epoch} step {step + 1} at folder {args.save_dir}" - ) - - if args.use_neft: - coordinator.print_on_master("Activate NEFTune.") - model, handle = activate_neftune(model) - - # Delete cache. - # del batch, batch_labels, batch_output, loss - accelerator.empty_cache() - - # the continue epochs are not resumed, so we need to reset the sampler start index and start step - dataloader.sampler.set_start_index(start_index=0) - start_step = 0 - - if args.use_neft: - coordinator.print_on_master("Deactivate NEFTune.") - deactivate_neftune(model, handle) - - # Final save. - if not args.benchmark: - coordinator.print_on_master("Start saving final model checkpoint") - booster.save_model(model, os.path.join(args.save_dir, "modeling"), shard=True) - coordinator.print_on_master(f"Saved final model checkpoint at epoch {epoch} at folder {args.save_dir}") - - coordinator.print_on_master(f"Max device memory usage: {accelerator.max_memory_allocated()/1024**2:.2f} MB") - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - # Basic training information. - parser.add_argument( - "--pretrained", - type=str, - default=None, - help="Address of the pre-trained model", - ) - parser.add_argument("--load_checkpoint", type=str, default=None, help="Load checkpoint for continuous training.") - parser.add_argument("--dataset", type=str, default='') - parser.add_argument( - "--plugin", - type=str, - default="gemini", - choices=["gemini", "gemini_auto", "zero2", "zero2_cpu", "3d", "ddp"], - help="Choose which plugin to use", - ) - parser.add_argument("--save_interval", type=int, default=1000, help="Save interval") - parser.add_argument("--save_dir", type=str, default="checkpoint_dir", help="Checkpoint directory") - parser.add_argument("--tensorboard_dir", type=str, default="logs_dir", help="Tensorboard directory") - parser.add_argument("--config_file", type=str, default="config_file", help="Config file") - # Training parameters - parser.add_argument("--num_epochs", type=int, default=1, help="Number of training epochs") - parser.add_argument("--accumulation_steps", type=int, default=1, help="Number of accumulation steps") - parser.add_argument("--batch_size", type=int, default=2, help="Global Batch size of each process") - parser.add_argument("--lr", type=float, default=3e-4, help="Learning rate") - parser.add_argument("--max_length", type=int, default=8192, help="Model max length") - parser.add_argument( - "--mixed_precision", - type=str, - default="fp16", - choices=["fp16", "bf16"], - help="Mixed precision", - ) - parser.add_argument("--grad_clip", type=float, default=1.0, help="Gradient clipping value") - parser.add_argument("--weight_decay", type=float, default=0.1, help="Weight decay") - parser.add_argument("--warmup_steps", type=int, default=None, help="Warmup steps") - parser.add_argument( - "--use_grad_checkpoint", - action="store_true", - default=False, - help="Use gradient checkpointing", - ) - parser.add_argument( - "--use_flash_attn", - action="store_true", - default=False, - help="Use flash-attention", - ) - parser.add_argument( - "--use_neft", - action="store_true", - default=False, - help="Use NEFTune", - ) - parser.add_argument( - "--freeze_non_embeds_params", - action="store_true", - default=False, - help="Freeze non embeddings parameters", - ) - parser.add_argument("--pad_token", choices=["eos", "unk"], default="eos") - parser.add_argument("--padding_mode", choices=["max_length", "longest"], default="max_length") - parser.add_argument( - "--skip_save_each_epoch", - action="store_true", - default=False, - help="Skip saving the model checkpoint after each epoch is completed.", - ) - - # Additional arguments for 3d plugin. - parser.add_argument("--tp", type=int, default=1, help="TP size, used for 3d plugin.") - parser.add_argument("--pp", type=int, default=1, help="PP size, used for 3d plugin.") - parser.add_argument("--sp", type=int, default=1, help="SP size, used for 3d plugin.") - parser.add_argument("--zero_stage", type=int, default=0, help="Zero stage, used for 3d plugin.", choices=[0, 1, 2]) - parser.add_argument( - "--sp_mode", - type=str, - default="split_gather", - choices=["split_gather", "ring", "all_to_all"], - help="SP mode, used for 3d plugin.", - ) - parser.add_argument( - "--enable_sequence_parallelism", - default=False, - action="store_true", - help="Whether to enable SP, used for 3d plugin.", - ) - parser.add_argument( - "--zero_cpu_offload", default=False, action="store_true", help="Whether to use offloading, used for 3d plugin." - ) - parser.add_argument( - "--microbatch_size", type=int, default=1, help="Batch size for each process in PP, used for 3d plugin." - ) - - # Additional arguments for benchmark. - parser.add_argument("--num_samples", type=int, default=500, help="Number of samples for benchmarking.") - parser.add_argument( - "--benchmark", action="store_true", default=False, help="Benchmark performance using random dataset." - ) - parser.add_argument("--tokenizer_dir", type=str, default="", help="the path to llamatokenizer") - parser.add_argument("--config", type=str, default="7b", help="Model configuration") - parser.add_argument("--custom_ckpt", action="store_true", help="Customize checkpoint", default=False) - parser.add_argument('--custom_recompute_layers_per_stage', nargs='*', type=int, default=None, - help='custom recompute num layers in each PP stage, it should be equal to PP size ') - parser.add_argument("--ignore_steps", type=int, default=2, help="Number of steps to ignore") - parser.add_argument("--use_ixformer_mlp", action="store_true", help="use_ixformer_mlp", default=False) - parser.add_argument("--use_colo_llamaflashatten", action="store_true", help="use_colo_attention", default=False) - parser.add_argument("--use_ixformer_fusedrmsnormres", action="store_true", help="fused res and accumulating weight grad in rmsnormalization", default=False) - - args = parser.parse_args() - train(args) diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/train_sft.example.sh b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/train_sft.example.sh deleted file mode 100644 index d87f9ef82f4ff5942f9952df4afc95d3ca13e53f..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/train_sft.example.sh +++ /dev/null @@ -1,47 +0,0 @@ -#!/bin/bash - -# NCCL IB environment variables -export NCCL_IB_HCA=mlx5_1:1,mlx5_2:1,mlx5_3:1,mlx5_4:1 -export NCCL_IB_DISABLE=0 -export NCCL_SOCKET_IFNAME=eth0 -export NCCL_IB_GID_INDEX=3 -export NCCL_IB_TIMEOUT=23 -export NCCL_IB_RETRY_CNT=7 -export OMP_NUM_THREADS=8 - -PROJECT_NAME="" -PARENT_SAVE_DIR="" -PARENT_TENSORBOARD_DIR="" -PARENT_CONFIG_FILE="" -PRETRAINED_MODEL_PATH="" - -declare -a dataset=( - "PATH TO THE DATASET" -) - -TIMESTAMP=$(date +%Y-%m-%d-%H-%M-%S) -FULL_PROJECT_NAME="${PROJECT_NAME}-${TIMESTAMP}" -SAVE_DIR="${PARENT_SAVE_DIR}${FULL_PROJECT_NAME}" -TENSORBOARD_DIR="${PARENT_TENSORBOARD_DIR}${FULL_PROJECT_NAME}" -CONFIG_FILE="${PARENT_CONFIG_FILE}${FULL_PROJECT_NAME}.json" - -colossalai run --nproc_per_node 8 --hostfile hostfile --master_port 30013 train.py \ - --pretrained $PRETRAINED_MODEL_PATH \ - --dataset ${dataset[@]} \ - --plugin "zero2" \ - --save_interval 400 \ - --save_dir $SAVE_DIR \ - --tensorboard_dir $TENSORBOARD_DIR \ - --config_file $CONFIG_FILE \ - --num_epochs 1 \ - --accumulation_steps 8 \ - --micro_batch_size 8 \ - --lr 5e-5 \ - --mixed_precision "bf16" \ - --grad_clip 1.0 \ - --weight_decay 0.01 \ - --warmup_steps 100 \ - --use_grad_checkpoint \ - --use_flash_attn \ - --use_neft \ - --pad_token "eos" diff --git a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/version.txt b/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/version.txt deleted file mode 100644 index 9084fa2f716a7117829f3f32a5f4cef400e02903..0000000000000000000000000000000000000000 --- a/nlp/llm/llama3_8b/colossalai/applications/Colossal-LLaMA/version.txt +++ /dev/null @@ -1 +0,0 @@ -1.1.0 diff --git a/toolbox/DeepSpeed/install_toolbox_deepspeed.sh b/toolbox/DeepSpeed/install_toolbox_deepspeed.sh deleted file mode 100644 index 5e17e3d0184a8291ba567d4489ec21a39edf0b07..0000000000000000000000000000000000000000 --- a/toolbox/DeepSpeed/install_toolbox_deepspeed.sh +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright (c) 2023, Shanghai Iluvatar CoreX Semiconductor Co., Ltd. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -set -x -git clone -b v0.9.2 https://github.com/microsoft/DeepSpeed.git -cp -r -T patch/ DeepSpeed/ -cd DeepSpeed/ -yum install -y libaio libaio-devel -bash clean_deepspeed.sh -bash build_deepspeed.sh -bash install_deepspeed.sh