File size: 7,509 Bytes
1fe2937 475302b 1197f7d 7d7e199 475302b 1197f7d 3e08dd8 1197f7d 475302b 3e08dd8 475302b 0174b5b 1197f7d dcceddd 1197f7d dcceddd 1197f7d 3e08dd8 42e909b 3e08dd8 604c897 5b9da41 604c897 5b9da41 1197f7d dcceddd 1197f7d 3e08dd8 5b9da41 3e08dd8 7d7e199 1fe2937 60c4943 1fe2937 2ee7407 4056352 ecc08c8 3b31306 475302b d5a73bd 475302b e0c8580 d5a73bd 475302b d5a73bd 475302b e0c8580 21a413f 475302b 21a413f 8b3b3ef 21a413f 475302b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
import os
from pathlib import Path
from typing import List, Optional, Type, Union
import torch
import torch.distributed as dist
from omegaconf import ListConfig
from torch import Tensor
from torch.optim import Optimizer
from torch.optim.lr_scheduler import LambdaLR, SequentialLR, _LRScheduler
from yolo.config.config import IDX_TO_ID, NMSConfig, OptimizerConfig, SchedulerConfig
from yolo.model.yolo import YOLO
from yolo.utils.bounding_box_utils import bbox_nms, transform_bbox
from yolo.utils.logger import logger
class ExponentialMovingAverage:
def __init__(self, model: torch.nn.Module, decay: float):
self.model = model
self.decay = decay
self.shadow = {name: param.clone().detach() for name, param in model.named_parameters()}
def update(self):
"""Update the shadow parameters using the current model parameters."""
for name, param in self.model.named_parameters():
assert name in self.shadow, "All model parameters should have a corresponding shadow parameter."
new_average = (1.0 - self.decay) * param.data + self.decay * self.shadow[name]
self.shadow[name] = new_average.clone()
def apply_shadow(self):
"""Apply the shadow parameters to the model."""
for name, param in self.model.named_parameters():
param.data.copy_(self.shadow[name])
def restore(self):
"""Restore the original parameters from the shadow."""
for name, param in self.model.named_parameters():
self.shadow[name].copy_(param.data)
def create_optimizer(model: YOLO, optim_cfg: OptimizerConfig) -> Optimizer:
"""Create an optimizer for the given model parameters based on the configuration.
Returns:
An instance of the optimizer configured according to the provided settings.
"""
optimizer_class: Type[Optimizer] = getattr(torch.optim, optim_cfg.type)
bias_params = [p for name, p in model.named_parameters() if "bias" in name]
norm_params = [p for name, p in model.named_parameters() if "weight" in name and "bn" in name]
conv_params = [p for name, p in model.named_parameters() if "weight" in name and "bn" not in name]
model_parameters = [
{"params": bias_params, "momentum": 0.8, "weight_decay": 0},
{"params": conv_params, "momentum": 0.8},
{"params": norm_params, "momentum": 0.8, "weight_decay": 0},
]
def next_epoch(self, batch_num):
self.min_lr = self.max_lr
self.max_lr = [param["lr"] for param in self.param_groups]
self.batch_num = batch_num
self.batch_idx = 0
def next_batch(self):
self.batch_idx += 1
for lr_idx, param_group in enumerate(self.param_groups):
min_lr, max_lr = self.min_lr[lr_idx], self.max_lr[lr_idx]
param_group["lr"] = min_lr + (self.batch_idx) * (max_lr - min_lr) / self.batch_num
optimizer_class.next_batch = next_batch
optimizer_class.next_epoch = next_epoch
optimizer = optimizer_class(model_parameters, **optim_cfg.args)
optimizer.max_lr = [0.1, 0, 0]
return optimizer
def create_scheduler(optimizer: Optimizer, schedule_cfg: SchedulerConfig) -> _LRScheduler:
"""Create a learning rate scheduler for the given optimizer based on the configuration.
Returns:
An instance of the scheduler configured according to the provided settings.
"""
scheduler_class: Type[_LRScheduler] = getattr(torch.optim.lr_scheduler, schedule_cfg.type)
schedule = scheduler_class(optimizer, **schedule_cfg.args)
if hasattr(schedule_cfg, "warmup"):
wepoch = schedule_cfg.warmup.epochs
lambda1 = lambda epoch: (epoch + 1) / wepoch if epoch < wepoch else 1
lambda2 = lambda epoch: 10 - 9 * ((epoch + 1) / wepoch) if epoch < wepoch else 1
warmup_schedule = LambdaLR(optimizer, lr_lambda=[lambda2, lambda1, lambda1])
schedule = SequentialLR(optimizer, schedulers=[warmup_schedule, schedule], milestones=[2])
return schedule
def initialize_distributed() -> None:
rank = int(os.getenv("RANK", "0"))
local_rank = int(os.getenv("LOCAL_RANK", "0"))
world_size = int(os.getenv("WORLD_SIZE", "1"))
torch.cuda.set_device(local_rank)
dist.init_process_group(backend="nccl", rank=rank, world_size=world_size)
logger.info(f"🔢 Initialized process group; rank: {rank}, size: {world_size}")
return local_rank
def get_device(device_spec: Union[str, int, List[int]]) -> torch.device:
ddp_flag = False
if isinstance(device_spec, (list, ListConfig)):
ddp_flag = True
device_spec = initialize_distributed()
if torch.cuda.is_available() and "cuda" in str(device_spec):
return torch.device(device_spec), ddp_flag
if not torch.cuda.is_available():
if device_spec != "cpu":
logger.warning(f"❎ Device spec: {device_spec} not support, Choosing CPU instead")
return torch.device("cpu"), False
device = torch.device(device_spec)
return device, ddp_flag
class PostProccess:
"""
TODO: function document
scale back the prediction and do nms for pred_bbox
"""
def __init__(self, converter, nms_cfg: NMSConfig) -> None:
self.converter = converter
self.nms = nms_cfg
def __call__(self, predict, rev_tensor: Optional[Tensor] = None) -> List[Tensor]:
prediction = self.converter(predict["Main"])
pred_class, _, pred_bbox = prediction[:3]
pred_conf = prediction[3] if len(prediction) == 4 else None
if rev_tensor is not None:
pred_bbox = (pred_bbox - rev_tensor[:, None, 1:]) / rev_tensor[:, 0:1, None]
pred_bbox = bbox_nms(pred_class, pred_bbox, self.nms, pred_conf)
return pred_bbox
def collect_prediction(predict_json: List, local_rank: int) -> List:
"""
Collects predictions from all distributed processes and gathers them on the main process (rank 0).
Args:
predict_json (List): The prediction data (can be of any type) generated by the current process.
local_rank (int): The rank of the current process. Typically, rank 0 is the main process.
Returns:
List: The combined list of predictions from all processes if on rank 0, otherwise predict_json.
"""
if dist.is_initialized() and local_rank == 0:
all_predictions = [None for _ in range(dist.get_world_size())]
dist.gather_object(predict_json, all_predictions, dst=0)
predict_json = [item for sublist in all_predictions for item in sublist]
elif dist.is_initialized():
dist.gather_object(predict_json, None, dst=0)
return predict_json
def predicts_to_json(img_paths, predicts, rev_tensor):
"""
TODO: function document
turn a batch of imagepath and predicts(n x 6 for each image) to a List of diction(Detection output)
"""
batch_json = []
for img_path, bboxes, box_reverse in zip(img_paths, predicts, rev_tensor):
scale, shift = box_reverse.split([1, 4])
bboxes = bboxes.clone()
bboxes[:, 1:5] = (bboxes[:, 1:5] - shift[None]) / scale[None]
bboxes[:, 1:5] = transform_bbox(bboxes[:, 1:5], "xyxy -> xywh")
for cls, *pos, conf in bboxes:
bbox = {
"image_id": int(Path(img_path).stem),
"category_id": IDX_TO_ID[int(cls)],
"bbox": [float(p) for p in pos],
"score": float(conf),
}
batch_json.append(bbox)
return batch_json
|