from __future__ import absolute_import, division, print_function, unicode_literals
import math
import warnings
from abc import ABCMeta, abstractmethod
from functools import partial
import torch
import torch.nn as nn
from torch._jit_internal import List, Optional
class _PartialWrapper(object):
def __init__(self, p):
self.p = p
def __call__(self, *args, **keywords):
return self.p(*args, **keywords)
def __repr__(self):
return self.p.__repr__()
def _with_args(cls_or_self, **kwargs):
"""
Wrapper around functools.partial that allows chaining.
Often you want to assign it to a class as a class method:
Foo.with_args = classmethod(_with_args)
Foo.with_args(x=1).with_args(y=2)
"""
r = _PartialWrapper(partial(cls_or_self, **kwargs))
return r
_PartialWrapper.with_args = _with_args
ABC = ABCMeta(str("ABC"), (object,), {}) # compatible with Python 2 *and* 3:
[docs]class Observer(ABC, nn.Module):
r"""
Observer base Module. Any observer implementation should derive from this class.
Concrete observers should follow the same API. In forward, they will update
the statistics of the observed Tensor. And they should provide a
`calculate_qparams` function that computes the quantization parameters given
the collected statistics.
"""
def __init__(self, dtype):
super(Observer, self).__init__()
self.dtype = dtype
@abstractmethod
def forward(self, x):
pass
@abstractmethod
def calculate_qparams(self, **kwargs):
pass
with_args = classmethod(_with_args)
class _ObserverBase(Observer):
r"""
Common base for all qint/quint8 observers
"""
def __init__(
self, dtype=torch.quint8, qscheme=torch.per_tensor_affine, reduce_range=False
):
super(_ObserverBase, self).__init__(dtype=dtype)
self.qscheme = qscheme
self.reduce_range = reduce_range
self.eps = torch.finfo(torch.float32).eps
assert self.qscheme in (
torch.per_tensor_affine,
torch.per_tensor_symmetric,
torch.per_channel_affine,
torch.per_channel_symmetric,
), "Default Observer only works for per_tensor_affine, \
per_tensor_symmetric, per_channel_affine and \
per_channel_symmetric quantization scheme"
assert self.dtype in (
torch.qint8,
torch.quint8,
), "Default Observer only works for qint8 and quint8 data type"
def _calculate_per_channel_qparams(self, min_vals, max_vals):
# type: (Optional[Tensor], Optional[Tensor]) -> Tuple[Tensor, Tensor]
"""
Given min and max value tensors, this function calculates per channel
quantization parameters
"""
if min_vals is None or max_vals is None:
warnings.warn(
"must run observer before calling calculate_qparams.\
Returning default scale and zero point "
)
return torch.tensor([1.0]), torch.tensor([0])
for i in range(len(min_vals)):
assert (
min_vals[i] <= max_vals[i]
), "min {} should be less than max {}".format(min_vals[i], max_vals[i])
scales = torch.empty(min_vals.size(), dtype=torch.float32)
zero_points = torch.empty(min_vals.size(), dtype=torch.int64)
for i in range(len(scales)):
qparam = self._calculate_qparams(
min_vals[i], max_vals[i]
)
scales[i] = float(qparam[0])
zero_points[i] = int(qparam[1])
return scales, zero_points
def _calculate_qparams(self, min_val, max_val):
# type: (Optional[Tensor], Optional[Tensor]) -> Tuple[Tensor, Tensor]
"""
Given min and max values, this function calculates quantization parameters
"""
if max_val is None or min_val is None:
warnings.warn(
"must run observer before calling calculate_qparams.\
Returning default scale and zero point "
)
return torch.tensor([1.0]), torch.tensor([0])
assert min_val <= max_val, "min {} should be less than max {}".format(
min_val, max_val
)
if self.dtype == torch.qint8:
if self.reduce_range:
qmin, qmax = -64, 63
else:
qmin, qmax = -128, 127
else:
if self.reduce_range:
qmin, qmax = 0, 127
else:
qmin, qmax = 0, 255
max_val, min_val = float(max_val), float(min_val)
min_val = min(0.0, min_val)
max_val = max(0.0, max_val)
if max_val == min_val:
scale = 1.0
zero_point = 0
else:
if self.qscheme == torch.per_tensor_symmetric or self.qscheme == torch.per_channel_symmetric:
max_val = max(-min_val, max_val)
scale = max_val / ((qmax - qmin) / 2)
scale = max(scale, self.eps)
zero_point = 0 if self.dtype == torch.qint8 else 128
else:
scale = (max_val - min_val) / float(qmax - qmin)
scale = max(scale, self.eps)
zero_point = qmin - round(min_val / scale)
zero_point = max(qmin, zero_point)
zero_point = min(qmax, zero_point)
zero_point = int(zero_point)
return torch.tensor([scale]), torch.tensor([zero_point])
[docs]class MinMaxObserver(_ObserverBase):
r"""Default Observer Module
A default implementation of the observer module, only works for
`per_tensor_affine` quantization scheme. The module will record the
running average of max and min value of the observed Tensor and
calculate_qparams will calculate scale and zero_point
"""
__annotations__ = {
"min_val": Optional[torch.Tensor],
"max_val": Optional[torch.Tensor],
}
def __init__(self, **kwargs):
# For x86 quantized kernels, we need to ensure that the vpmaddubsw instruction
# does not overflow. We allow for a reduce_range argument to observers that
# reduces the quantized range to (0,127) or (-64, 63). For more details see
# aten/src/ATen/native/quantized/cpu/qconv.cpp
# This is not the optimal choice for non x86 backends as
# lose a bit of precision for activations.
super(MinMaxObserver, self).__init__(**kwargs)
self.min_val = None
self.max_val = None
if (
self.qscheme == torch.per_tensor_symmetric
and self.reduce_range
and self.dtype == torch.quint8
):
raise NotImplementedError(
"Cannot reduce range for symmetric quantization for quint8"
)
def forward(self, x_orig):
x = x_orig.detach() # avoid keeping autograd tape
min_val = self.min_val
max_val = self.max_val
if min_val is None or max_val is None:
min_val = torch.min(x)
max_val = torch.max(x)
else:
min_val = torch.min(torch.min(x), min_val)
max_val = torch.max(torch.max(x), max_val)
self.min_val = min_val
self.max_val = max_val
return x_orig
@torch.jit.export
def calculate_qparams(self):
return self._calculate_qparams(self.min_val, self.max_val)
@torch.jit.export
def extra_repr(self):
return "min_val={}, max_val={}".format(self.min_val, self.max_val)
def _save_to_state_dict(self, destination, prefix, keep_vars):
super(MinMaxObserver, self)._save_to_state_dict(destination, prefix, keep_vars)
destination[prefix + 'min_val'] = self.min_val
destination[prefix + 'max_val'] = self.max_val
def _load_from_state_dict(self, state_dict, prefix, local_metadata, strict,
missing_keys, unexpected_keys, error_msgs):
self.min_val = state_dict.pop(prefix + 'min_val')
self.max_val = state_dict.pop(prefix + 'max_val')
super(MinMaxObserver, self)._load_from_state_dict(state_dict, prefix, local_metadata, False,
missing_keys, unexpected_keys, error_msgs)
class MovingAverageMinMaxObserver(MinMaxObserver):
def __init__(self, averaging_constant=0.01, **kwargs):
self.averaging_constant = averaging_constant
super(MovingAverageMinMaxObserver, self).__init__(**kwargs)
def forward(self, x_orig):
x = x_orig.detach() # avoid keeping autograd tape
min_val = self.min_val
max_val = self.max_val
if min_val is None or max_val is None:
min_val = torch.min(x)
max_val = torch.max(x)
else:
min_val = min_val + self.averaging_constant * (torch.min(x) - min_val)
max_val = max_val + self.averaging_constant * (torch.max(x) - max_val)
self.min_val = min_val
self.max_val = max_val
return x_orig
[docs]class PerChannelMinMaxObserver(_ObserverBase):
r"""Per Channel Observer Module
The module will record the running average of max and min value for each
channel of the observed Tensor and calculate_qparams will calculate
scales and zero_points for each channel
"""
def __init__(self, ch_axis=0, **kwargs):
super(PerChannelMinMaxObserver, self).__init__(**kwargs)
self.ch_axis = ch_axis
self.register_buffer('min_vals', None)
self.register_buffer('max_vals', None)
if (
self.qscheme == torch.per_channel_symmetric
and self.reduce_range
and self.dtype == torch.quint8
):
raise NotImplementedError(
"Cannot reduce range for symmetric quantization for quint8"
)
def forward(self, x_orig):
x = x_orig.detach() # avoid keeping autograd tape
min_vals = self.min_vals
max_vals = self.max_vals
x_dim = x.size()
new_axis_list = list(range(len(x_dim)))
new_axis_list[self.ch_axis] = 0
new_axis_list[0] = self.ch_axis
y = x.permute(tuple(new_axis_list))
y = torch.flatten(y, start_dim=1)
if min_vals is None or max_vals is None:
min_vals = torch.min(y, 1)[0]
max_vals = torch.max(y, 1)[0]
else:
min_vals = torch.min(torch.min(y, 1)[0], min_vals)
max_vals = torch.max(torch.max(y, 1)[0], max_vals)
self.min_vals = min_vals
self.max_vals = max_vals
return x_orig
def calculate_qparams(self):
return self._calculate_per_channel_qparams(self.min_vals, self.max_vals)
def extra_repr(self):
return "min_val={}, max_val={}".format(self.min_vals, self.max_vals)
def _load_from_state_dict(self, state_dict, prefix, local_metadata, strict,
missing_keys, unexpected_keys, error_msgs):
# We have to handle min_vals and max_vals manually even though they are registered as buffers
# as they are initialized to None
self.min_vals = state_dict.pop(prefix + 'min_vals')
self.max_vals = state_dict.pop(prefix + 'max_vals')
super(PerChannelMinMaxObserver, self)._load_from_state_dict(state_dict, prefix, local_metadata, False,
missing_keys, unexpected_keys, error_msgs)
[docs]class MovingAveragePerChannelMinMaxObserver(PerChannelMinMaxObserver):
r"""Per Channel Observer Module
The module will record the running average of max and min value for each
channel of the observed Tensor and calculate_qparams will calculate
scales and zero_points for each channel
"""
def __init__(self, averaging_constant=0.01, **kwargs):
self.averaging_constant = averaging_constant
super(MovingAveragePerChannelMinMaxObserver, self).__init__(**kwargs)
def forward(self, x_orig):
x = x_orig.detach() # avoid keeping autograd tape
min_vals = self.min_vals
max_vals = self.max_vals
x_dim = x.size()
new_axis_list = list(range(len(x_dim)))
new_axis_list[self.ch_axis] = 0
new_axis_list[0] = self.ch_axis
y = x.permute(tuple(new_axis_list))
y = torch.flatten(y, start_dim=1)
if min_vals is None or max_vals is None:
min_vals = torch.min(y, 1)[0]
max_vals = torch.max(y, 1)[0]
else:
min_vals = min_vals + self.averaging_constant * (torch.min(y, 1)[0] - min_vals)
max_vals = max_vals + self.averaging_constant * (torch.max(y, 1)[0] - max_vals)
self.min_vals = min_vals
self.max_vals = max_vals
return x_orig
[docs]class HistogramObserver(_ObserverBase):
r"""
The module records the running histogram of tensor values along with
min/max values. calculate_qparams will calculate scale and zero_point
"""
__annotations__ = {
"min_val": Optional[torch.Tensor],
"max_val": Optional[torch.Tensor],
}
def __init__(self, bins=2048, **kwargs):
# bins: The number of bins used for histogram calculation.
super(HistogramObserver, self).__init__(**kwargs)
self.bins = bins
self.register_buffer('histogram', torch.zeros(self.bins))
self.min_val = None
self.max_val = None
@staticmethod
def _get_norm(delta_begin, delta_end, density, norm_type):
"""
Compute the norm of the values uniformaly distributed between
delta_begin and delta_end.
norm = density * (integral_{begin, end} x^2)
= density * (end^3 - begin^3) / 3
"""
assert norm_type == "L2", "Only L2 norms are currently supported"
norm = 0.0
if norm_type == "L2":
norm = (
delta_end * delta_end * delta_end
- delta_begin * delta_begin * delta_begin
) / 3
return density * norm
def _compute_quantization_error(self, next_start_bin, next_end_bin, norm_type):
"""
Compute the quantization error if we use start_bin to end_bin as the
min and max to do the quantization.
"""
dst_nbins = 2 ** torch.iinfo(self.dtype).bits
bin_width = (self.max_val.item() - self.min_val.item()) / self.bins
norm = 0.0
dst_bin_width = bin_width * (next_end_bin - next_start_bin + 1) / dst_nbins
if dst_bin_width == 0.0:
return 0.0
for src_bin in range(self.bins):
# distances from the beginning of first dst_bin to the beginning and
# end of src_bin
src_bin_begin = (src_bin - next_start_bin) * bin_width
src_bin_end = src_bin_begin + bin_width
# which dst_bins the beginning and end of src_bin belong to?
dst_bin_of_begin = min(
dst_nbins - 1, max(0.0, math.floor(src_bin_begin / dst_bin_width))
)
dst_bin_of_end = min(
dst_nbins - 1, max(0.0, math.floor(src_bin_end / dst_bin_width))
)
dst_bin_of_begin_center = (
dst_bin_of_begin * dst_bin_width + dst_bin_width / 2
)
density = self.histogram[src_bin] / bin_width
if dst_bin_of_begin == dst_bin_of_end:
# if src_bin is entirely within 1 dst_bin
delta_begin = src_bin_begin - dst_bin_of_begin_center
delta_end = src_bin_end - dst_bin_of_begin_center
norm = norm + self._get_norm(delta_begin, delta_end, density, norm_type)
else:
delta_begin = src_bin_begin - dst_bin_of_begin_center
delta_end = dst_bin_width / 2
norm = norm + self._get_norm(delta_begin, delta_end, density, norm_type)
norm = norm + (dst_bin_of_end - dst_bin_of_begin - 1) * self._get_norm(
-dst_bin_width / 2, dst_bin_width / 2, density, norm_type
)
dst_bin_of_end_center = (
dst_bin_of_end * dst_bin_width + dst_bin_width / 2
)
delta_begin = -dst_bin_width / 2
delta_end = src_bin_end - dst_bin_of_end_center
norm = norm + self._get_norm(delta_begin, delta_end, density, norm_type)
return norm
def _non_linear_param_search(self):
"""
An approximation for L2 error minimization for selecting min/max.
By selecting new min/max, we filter out outliers in input distribution.
This follows the implementation of NormMinimization::NonlinearQuantizationParamsSearch in
caffe2/quantization/server/norm_minimization.cc
"""
assert self.histogram.size()[0] == self.bins, "bins mistmatch"
bin_width = (self.max_val - self.min_val) / self.bins
# cumulative sum
total = sum(self.histogram)
cSum = torch.cumsum(self.histogram, dim=0)
stepsize = 1e-5
alpha = 0.0
beta = 1.0
start_bin = 0
end_bin = self.bins - 1
norm_min = float("inf")
while alpha < beta:
next_alpha = alpha + stepsize
next_beta = beta - stepsize
# find the left and right bins between the quantile bounds
l = start_bin
r = end_bin
while l < end_bin and cSum[l] < next_alpha * total:
l = l + 1
while r > start_bin and cSum[r] > next_beta * total:
r = r - 1
next_start_bin = start_bin
next_end_bin = end_bin
if (l - start_bin) > (end_bin - r):
next_start_bin = l
alpha = next_alpha
else:
next_end_bin = r
beta = next_beta
if next_start_bin == start_bin and next_end_bin == end_bin:
continue
# calculate the quantization error using next_start_bin and next_end_bin
norm = self._compute_quantization_error(next_start_bin, next_end_bin, "L2")
if norm > norm_min:
break
norm_min = norm
start_bin = next_start_bin
end_bin = next_end_bin
new_min = self.min_val + bin_width * start_bin
new_max = self.min_val + bin_width * (end_bin + 1)
return new_min, new_max
def _combine_histograms(
self, dst_histogram, dst_min, dst_max, src_histogram, src_min, src_max
):
bins_dst = dst_histogram.size()[0]
bins_src = src_histogram.size()[0]
dst_bin_width = (dst_max - dst_min) / bins_dst
src_bin_width = (src_max - src_min) / bins_src
for i in range(bins_src):
src_bin_count = src_histogram[i].item()
if src_bin_count == 0:
continue
src_bin_begin = src_min + src_bin_width * i
src_bin_end = src_bin_begin + src_bin_width
dst_bin = 0
if dst_bin_width:
dst_bin = int((src_bin_begin - dst_min) / dst_bin_width)
dst_bin_begin = dst_min + dst_bin_width * dst_bin
dst_bin_end = dst_bin_begin + dst_bin_width
dst_bin2 = 0
if dst_bin_width:
dst_bin2 = min(
int((src_bin_end - dst_min) / dst_bin_width), bins_dst - 1
)
assert dst_bin2 <= dst_bin + 2, "1 src_bin is mapped to at most 2 dst_bins"
# dst_bin_cnt is the count from src_bin that should go to dst_bin
# the remainder should go to dst_bin2
dst_bin_cnt = 0
if src_bin_width == 0 or dst_bin_width == 0:
dst_bin_cnt = src_bin_count
else:
# We divide counts in src_bin in proportion to range overlap with dst_bin
dst_bin_cnt = min(
round(
(dst_bin_end - src_bin_begin) / src_bin_width * src_bin_count
),
src_bin_count,
)
dst_histogram[dst_bin] += dst_bin_cnt
# remaining should go to dst_bin2
if dst_bin_cnt < src_bin_count:
dst_histogram[dst_bin2] += src_bin_count - dst_bin_cnt
def forward(self, x):
with torch.no_grad():
min_val = self.min_val
max_val = self.max_val
if min_val is None or max_val is None:
min_val = torch.min(x)
max_val = torch.max(x)
self.min_val = min_val
self.max_val = max_val
self.histogram = torch.histc(x, self.bins, min=min_val, max=max_val)
else:
new_min = torch.min(x)
new_max = torch.max(x)
new_histogram = torch.histc(x, self.bins, min=new_min, max=new_max)
# combine the existing histogram and new histogram into 1 histogram
combined_histogram = torch.zeros_like(self.histogram)
combined_min = torch.min(new_min, self.min_val)
combined_max = torch.max(new_max, self.max_val)
self._combine_histograms(
combined_histogram,
combined_min.item(),
combined_max.item(),
self.histogram,
self.min_val.item(),
self.max_val.item(),
)
self._combine_histograms(
combined_histogram,
combined_min.item(),
combined_max.item(),
new_histogram,
new_min.item(),
new_max.item(),
)
self.histogram = combined_histogram
self.min_val = combined_min
self.max_val = combined_max
return x
def calculate_qparams(self):
if self.min_val is None or self.max_val is None:
warnings.warn(
"must run observer before calling calculate_qparams.\
Returning default scale and zero point "
)
return torch.tensor([1.0]), torch.tensor([0])
assert self.bins == len(self.histogram), (
"The number of bins in histogram should be equal to the number of bins "
"supplied while making this observer"
)
new_min, new_max = self._non_linear_param_search()
return self._calculate_qparams(new_min.item(), new_max.item())
def _save_to_state_dict(self, destination, prefix, keep_vars):
super(HistogramObserver, self)._save_to_state_dict(destination, prefix, keep_vars)
destination[prefix + 'min_val'] = self.min_val
destination[prefix + 'max_val'] = self.max_val
def _load_from_state_dict(self, state_dict, prefix, local_metadata, strict,
missing_keys, unexpected_keys, error_msgs):
self.min_val = state_dict.pop(prefix + 'min_val')
self.max_val = state_dict.pop(prefix + 'max_val')
super(HistogramObserver, self)._load_from_state_dict(state_dict, prefix, local_metadata, False,
missing_keys, unexpected_keys, error_msgs)
[docs]class RecordingObserver(_ObserverBase):
r"""
The module is mainly for debug and records the tensor values during runtime
"""
__annotations__ = {"tensor_val": List[Optional[torch.Tensor]]}
def __init__(self, **kwargs):
super(RecordingObserver, self).__init__(**kwargs)
self.tensor_val = []
def forward(self, x):
self.tensor_val.append(x.clone())
return x
@torch.jit.export
def calculate_qparams(self):
raise Exception("calculate_qparams should not be called for RecordingObserver")
@torch.jit.export
def get_tensor_value(self):
return self.tensor_val
[docs]class NoopObserver(Observer):
r"""
Observer that doesn't do anything and just passes its configuration to the
quantized module's ``.from_float()`.
Primarily used for quantization to float16 which doesn't require determining
ranges.
"""
def __init__(self, dtype=torch.float16):
if dtype != torch.float16:
raise ValueError("Only float16 quantization can be used without calibration process")
super(NoopObserver, self).__init__(dtype=dtype)
def forward(self, x):
return x
def calculate_qparams(self):
raise Exception("calculate_qparams should not be called for NoopObserver")
# Restrict activations to be in the range (0,127)
default_observer = MinMaxObserver.with_args(reduce_range=True)
default_debug_observer = RecordingObserver
default_weight_observer = MinMaxObserver.with_args(dtype=torch.qint8, qscheme=torch.per_tensor_symmetric)
default_histogram_observer = HistogramObserver.with_args(reduce_range=True)
default_per_channel_weight_observer = PerChannelMinMaxObserver.with_args(dtype=torch.qint8, qscheme=torch.per_channel_symmetric)