537 lines
25 KiB
Python
537 lines
25 KiB
Python
# -------------------------------------------------------------------------
|
|
# Copyright (c) Microsoft Corporation. All rights reserved.
|
|
# Licensed under the MIT License. See License.txt in the project root for
|
|
# license information.
|
|
# --------------------------------------------------------------------------
|
|
import logging
|
|
from typing import Any, Dict
|
|
|
|
import numpy as np
|
|
import onnx
|
|
import onnx.numpy_helper
|
|
|
|
try:
|
|
from onnx.reference.op_run import to_array_extended
|
|
except ImportError:
|
|
# old version of onnx.
|
|
to_array_extended = None
|
|
|
|
from .calibrate import TensorData
|
|
from .onnx_model import ONNXModel
|
|
from .quant_utils import (
|
|
ONNX_TYPE_TO_NP_TYPE,
|
|
TENSOR_NAME_QUANT_SUFFIX,
|
|
QuantType,
|
|
find_by_name,
|
|
model_has_infer_metadata,
|
|
normalize_axis,
|
|
pack_bytes_to_4bit,
|
|
quantize_data,
|
|
quantize_nparray,
|
|
save_and_reload_model_with_shape_infer,
|
|
tensor_proto_to_array,
|
|
)
|
|
from .tensor_quant_overrides import TensorQuantOverridesHelper
|
|
|
|
|
|
class QuantizationParams:
|
|
def __init__(self, **data: Dict[str, Any]):
|
|
self.data = {}
|
|
for k, v in data.items():
|
|
if not isinstance(k, str):
|
|
raise TypeError(f"Keys must be strings not {type(k)} for k={k!r}.")
|
|
if not isinstance(v, (int, str, np.ndarray)):
|
|
raise TypeError(f"Values must be numpy arrays, int, float, str not {type(v)} for k={k!r}.")
|
|
if k == "scale" and v.dtype not in (np.float32, np.float16):
|
|
raise ValueError(f"scale must a float32 or float16 numpy element but is {v.dtype} for k={k!r}")
|
|
self.data[k] = v
|
|
|
|
def __iter__(self):
|
|
yield from self.data
|
|
|
|
def __getitem__(self, key):
|
|
return self.data[key]
|
|
|
|
def __len__(self):
|
|
return len(self.data)
|
|
|
|
|
|
class BaseQuantizer:
|
|
def __init__(
|
|
self,
|
|
model,
|
|
per_channel,
|
|
reduce_range,
|
|
weight_qType,
|
|
activation_qType,
|
|
tensors_range,
|
|
nodes_to_quantize,
|
|
nodes_to_exclude,
|
|
op_types_to_quantize,
|
|
extra_options=None,
|
|
):
|
|
if not model_has_infer_metadata(model):
|
|
model = save_and_reload_model_with_shape_infer(model)
|
|
self.value_infos = {vi.name: vi for vi in model.graph.value_info}
|
|
self.value_infos.update({ot.name: ot for ot in model.graph.output})
|
|
self.value_infos.update({it.name: it for it in model.graph.input})
|
|
|
|
self.model = ONNXModel(model)
|
|
self.per_channel = per_channel # weight-pack per channel
|
|
self.reduce_range = reduce_range
|
|
|
|
self.extra_options = extra_options if extra_options else {}
|
|
self.enable_subgraph_quantization = (
|
|
"EnableSubgraph" in self.extra_options and self.extra_options["EnableSubgraph"]
|
|
)
|
|
self.parent = None
|
|
self.force_quantize_no_input_check = (
|
|
"ForceQuantizeNoInputCheck" in self.extra_options and self.extra_options["ForceQuantizeNoInputCheck"]
|
|
)
|
|
self.is_weight_symmetric = self.extra_options.get(
|
|
"WeightSymmetric", weight_qType in (QuantType.QInt8, QuantType.QInt16, QuantType.QFLOAT8E4M3FN)
|
|
)
|
|
self.is_activation_symmetric = self.extra_options.get("ActivationSymmetric", False)
|
|
self.min_real_range = self.extra_options.get("MinimumRealRange")
|
|
|
|
self.activation_qType = getattr(activation_qType, "tensor_type", activation_qType)
|
|
self.weight_qType = getattr(weight_qType, "tensor_type", weight_qType)
|
|
|
|
"""
|
|
Dictionary specifying the min and max values for tensors. It has following format:
|
|
{
|
|
"param_name": [min, max]
|
|
}
|
|
example:
|
|
{
|
|
'Conv_3:0': [np.float32(0), np.float32(0.5)],
|
|
'Conv_4:0': [np.float32(1), np.float32(3.5)]
|
|
}
|
|
"""
|
|
if tensors_range is not None and any(map(lambda t: not isinstance(t, TensorData), tensors_range.values())):
|
|
raise TypeError(
|
|
f"tensors_range contains unexpected types {set(type(v) for v in tensors_range.values())}, not TensorData."
|
|
)
|
|
self.tensors_range = tensors_range
|
|
self.nodes_to_quantize = nodes_to_quantize # specific nodes to quantize
|
|
self.nodes_to_exclude = nodes_to_exclude # specific nodes to exclude
|
|
self.op_types_to_quantize = op_types_to_quantize
|
|
|
|
self.opset_version = self.check_opset_version()
|
|
|
|
# Get tensor-level quantization overrides and ensure they are valid.
|
|
self.tensor_quant_overrides = TensorQuantOverridesHelper(self.extra_options.get("TensorQuantOverrides", {}))
|
|
|
|
self.initializers = {initzer.name: initzer for initzer in self.model.initializer()}
|
|
overrides_valid, overrides_err = self.tensor_quant_overrides.is_valid(
|
|
self.initializers, self.value_infos.keys(), activation_qType
|
|
)
|
|
if not overrides_valid:
|
|
raise ValueError(overrides_err)
|
|
|
|
self.tensor_quant_override_qtypes = self.tensor_quant_overrides.get_quant_types()
|
|
|
|
def quantize_model(self):
|
|
raise NotImplementedError
|
|
|
|
def is_input_a_initializer(self, input_name):
|
|
initializer = find_by_name(input_name, self.model.initializer())
|
|
return initializer is not None
|
|
|
|
def is_per_channel(self):
|
|
return self.per_channel
|
|
|
|
def is_valid_quantize_weight(self, weight_name):
|
|
weight = find_by_name(weight_name, self.model.initializer())
|
|
if weight is not None:
|
|
return weight.data_type in (onnx.TensorProto.FLOAT, onnx.TensorProto.FLOAT16)
|
|
if (not self.enable_subgraph_quantization) or (self.parent is None):
|
|
return False
|
|
return self.parent.is_valid_quantize_weight(weight_name)
|
|
|
|
def should_quantize_node(self, node):
|
|
if (
|
|
self.nodes_to_quantize is not None
|
|
and len(self.nodes_to_quantize) != 0
|
|
and node.name not in self.nodes_to_quantize
|
|
):
|
|
return False
|
|
|
|
if node.op_type not in self.op_types_to_quantize:
|
|
return False
|
|
|
|
if self.nodes_to_exclude is not None and node.name in self.nodes_to_exclude:
|
|
return False
|
|
|
|
return True
|
|
|
|
def check_opset_version(self):
|
|
ai_onnx_domain = [
|
|
opset for opset in self.model.model.opset_import if not opset.domain or opset.domain == "ai.onnx"
|
|
]
|
|
if len(ai_onnx_domain) != 1:
|
|
raise ValueError("Failed to find proper ai.onnx domain")
|
|
opset_version = ai_onnx_domain[0].version
|
|
|
|
if opset_version == 10:
|
|
logging.warning(
|
|
f"The original model opset version is {opset_version}, which does not support node fusions. Please update the model to opset >= 11 for better performance."
|
|
)
|
|
return 10
|
|
|
|
if opset_version < 10:
|
|
logging.warning(
|
|
f"The original model opset version is {opset_version}, which does not support quantization. Please update the model to opset >= 11. Updating the model automatically to opset 11. Please verify the quantized model."
|
|
)
|
|
self.model.model.opset_import.remove(ai_onnx_domain[0])
|
|
self.model.model.opset_import.extend([onnx.helper.make_opsetid("", 11)])
|
|
opset_version = 11
|
|
|
|
if opset_version < 19 and self.weight_qType == onnx.TensorProto.FLOAT8E4M3FN:
|
|
logging.warning(
|
|
f"The original model opset version is {opset_version}, which does not support quantization to float 8. "
|
|
"Please update the model to opset >= 19. Updating the model automatically to opset 19. "
|
|
"Please verify the quantized model."
|
|
)
|
|
self.model.model.opset_import.remove(ai_onnx_domain[0])
|
|
self.model.model.opset_import.extend([onnx.helper.make_opsetid("", 19)])
|
|
self.model.model.ir_version = 9
|
|
opset_version = 19
|
|
|
|
return opset_version
|
|
|
|
def quantize_bias_static_impl(self, bias_name, input_scale, weight_scale, beta=1.0):
|
|
"""
|
|
Quantized the bias. Zero Point == 0 and Scale == Input_Scale * Weight_Scale
|
|
"""
|
|
|
|
# get bias
|
|
bias_initializer = find_by_name(bias_name, self.model.initializer())
|
|
bias_data = tensor_proto_to_array(bias_initializer)
|
|
quantized_bias_name = bias_name + TENSOR_NAME_QUANT_SUFFIX
|
|
|
|
# quantize bias
|
|
if self.weight_qType == onnx.TensorProto.FLOAT8E4M3FN:
|
|
data = np.asarray(bias_data)
|
|
if data.dtype == np.float16:
|
|
node_qtype = onnx.TensorProto.FLOAT16
|
|
elif data.dtype == np.float32:
|
|
node_qtype = onnx.TensorProto.FLOAT
|
|
else:
|
|
raise TypeError(f"Only float16 or float32 are supported with float 8 but bias dtype is {data.dtype}.")
|
|
quantized_data = data.astype(np.float32)
|
|
bias_scale = np.array([1], dtype=quantized_data.dtype)
|
|
bias_scale_data = bias_scale.reshape(-1)
|
|
packed_bias_initializer = onnx.numpy_helper.from_array(quantized_data, quantized_bias_name)
|
|
self.model.initializer_extend([packed_bias_initializer])
|
|
node_type = "Cast"
|
|
else:
|
|
# calculate scale for bias
|
|
# TODO: This formula should be explained including why the scale is not estimated for the bias as well.
|
|
bias_scale = input_scale * weight_scale * beta
|
|
|
|
quantized_data = (np.asarray(bias_data) / bias_scale).round().astype(np.int32)
|
|
|
|
# update bias initializer
|
|
bias_np_data = np.asarray(quantized_data, dtype=np.int32).reshape(bias_initializer.dims)
|
|
packed_bias_initializer = onnx.numpy_helper.from_array(bias_np_data, quantized_bias_name)
|
|
self.model.initializer_extend([packed_bias_initializer])
|
|
|
|
# Bias's scale dtype should match the original bias data's unquantized type (float32 or float16).
|
|
bias_scale_data = np.asarray(bias_scale, dtype=bias_data.dtype).reshape(-1)
|
|
node_type = "DequantizeLinear"
|
|
node_qtype = self.weight_qType
|
|
|
|
# update scale initializer
|
|
quantized_bias_scale_name = quantized_bias_name + "_scale"
|
|
packed_bias_scale_initializer = onnx.numpy_helper.from_array(bias_scale_data, quantized_bias_scale_name)
|
|
self.model.initializer_extend([packed_bias_scale_initializer])
|
|
|
|
# update zero initializer
|
|
if self.weight_qType == onnx.TensorProto.FLOAT8E4M3FN:
|
|
tensor_type = self.weight_qType
|
|
else:
|
|
tensor_type = onnx.TensorProto.INT32
|
|
|
|
quantized_bias_zp_name = quantized_bias_name + "_zero_point"
|
|
if self.weight_qType == onnx.TensorProto.FLOAT8E4M3FN:
|
|
packed_bias_zp_initializer = onnx.helper.make_tensor(quantized_bias_zp_name, self.weight_qType, [1], [0.0])
|
|
elif bias_scale.size > 1:
|
|
bias_zp_data = np.zeros(bias_scale.shape, dtype=np.int32).reshape(-1)
|
|
packed_bias_zp_initializer = onnx.numpy_helper.from_array(bias_zp_data, quantized_bias_zp_name)
|
|
else:
|
|
packed_bias_zp_initializer = onnx.helper.make_tensor(quantized_bias_zp_name, tensor_type, [], [0])
|
|
self.model.initializer_extend([packed_bias_zp_initializer])
|
|
|
|
return (
|
|
quantized_bias_name,
|
|
quantized_bias_scale_name,
|
|
quantized_bias_zp_name,
|
|
bias_scale_data,
|
|
node_type,
|
|
node_qtype,
|
|
)
|
|
|
|
def quantize_initializer_impl(self, weight, qType, reduce_range=False, keep_float_weight=False):
|
|
"""
|
|
:param weight: TensorProto initializer
|
|
:param qType: type to quantize to
|
|
:param keep_float_weight: Whether to quantize the weight. In some cases, we only want to qunatize scale and zero point.
|
|
If keep_float_weight is False, quantize the weight, or don't quantize the weight.
|
|
:return: quantized weight name, zero point name, scale name
|
|
"""
|
|
q_weight_name = weight.name + TENSOR_NAME_QUANT_SUFFIX
|
|
zp_name = weight.name + "_zero_point"
|
|
scale_name = weight.name + "_scale"
|
|
|
|
# Quantize weight data. Use quantization overrides if provided by the user.
|
|
weight_data = tensor_proto_to_array(weight)
|
|
quant_overrides = self.tensor_quant_overrides.get_per_tensor_overrides(weight.name, default_val={})
|
|
if "quant_type" in quant_overrides:
|
|
qType = quant_overrides["quant_type"].tensor_type # noqa: N806
|
|
|
|
if "scale" in quant_overrides and "zero_point" in quant_overrides:
|
|
zero_point = np.array(quant_overrides["zero_point"], dtype=ONNX_TYPE_TO_NP_TYPE[qType])
|
|
scale = np.array(quant_overrides["scale"])
|
|
q_weight_data = quantize_nparray(qType, weight_data.flatten(), scale, zero_point)
|
|
assert isinstance(zero_point, np.ndarray), f"Unexpected type {type(zero_point)}"
|
|
assert (
|
|
zero_point.dtype != np.float32 and zero_point.dtype != np.float16
|
|
), f"Unexpected dtype {zero_point.dtype}"
|
|
assert isinstance(scale, np.ndarray), f"Unexpected type {type(scale)}"
|
|
|
|
else:
|
|
_, _, zero_point, scale, q_weight_data = quantize_data(
|
|
weight_data.flatten(),
|
|
qType,
|
|
quant_overrides.get("symmetric", self.is_weight_symmetric),
|
|
reduce_range=quant_overrides.get("reduce_range", self.reduce_range and reduce_range),
|
|
min_real_range=self.min_real_range,
|
|
rmin_override=quant_overrides.get("rmin"),
|
|
rmax_override=quant_overrides.get("rmax"),
|
|
)
|
|
|
|
assert isinstance(zero_point, np.ndarray), f"Unexpected type {type(zero_point)}"
|
|
assert (
|
|
zero_point.dtype != np.float32 and zero_point.dtype != np.float16
|
|
), f"Unexpected dtype {zero_point.dtype}"
|
|
assert isinstance(scale, np.ndarray), f"Unexpected type {type(scale)}"
|
|
|
|
scale_dtype = weight.data_type
|
|
scale_initializer = onnx.helper.make_tensor(scale_name, scale_dtype, [], scale.reshape((-1,)).tolist())
|
|
zero_initializer = onnx.helper.make_tensor(zp_name, qType, [], zero_point.reshape((-1,)).tolist())
|
|
self.model.initializer_extend([scale_initializer, zero_initializer])
|
|
|
|
if not keep_float_weight:
|
|
if self.weight_qType == onnx.TensorProto.FLOAT8E4M3FN:
|
|
q_weight_initializer = onnx.TensorProto()
|
|
q_weight_initializer.data_type = self.weight_qType
|
|
q_weight_initializer.dims.extend(weight.dims)
|
|
q_weight_initializer.name = q_weight_name
|
|
# Do not remove .flatten().copy() numpy is not clear about data persistence.
|
|
q_weight_initializer.raw_data = q_weight_data.flatten().copy().tobytes()
|
|
if to_array_extended is not None:
|
|
# This test should not be needed but it helped catch some issues
|
|
# with data persistence and tobytes.
|
|
check = to_array_extended(q_weight_initializer)
|
|
if check.shape != weight_data.shape or check.tobytes() != q_weight_data.tobytes():
|
|
raise RuntimeError(
|
|
f"The initializer of shape {weight_data.shape} could not be created, expecting "
|
|
f"{q_weight_data.tobytes()[:10]}, got {check.tobytes()[:10]} and shape={weight.shape}"
|
|
f"\nraw={str(q_weight_initializer)[:200]}."
|
|
)
|
|
elif qType in (onnx.TensorProto.INT4, onnx.TensorProto.UINT4):
|
|
if q_weight_data.dtype not in (np.int8, np.uint8):
|
|
raise RuntimeError(
|
|
f"Quantized weights for {q_weight_name} must be 8-bit before packing as 4-bit values."
|
|
)
|
|
|
|
# We do not use onnx.helper.pack_float32_to_4bit() due to performance.
|
|
# This can be the difference between a large model taking 30 minutes to quantize vs 5 minutes.
|
|
packed_data = bytes(pack_bytes_to_4bit(q_weight_data.tobytes()))
|
|
|
|
# We only use onnx.helper.make_tensor with raw data due to bug: https://github.com/onnx/onnx/pull/6161
|
|
q_weight_initializer = onnx.helper.make_tensor(q_weight_name, qType, weight.dims, packed_data, raw=True)
|
|
else:
|
|
q_weight_data = np.asarray(q_weight_data, dtype=onnx.helper.tensor_dtype_to_np_dtype(qType)).reshape(
|
|
weight.dims
|
|
)
|
|
q_weight_initializer = onnx.numpy_helper.from_array(q_weight_data, q_weight_name)
|
|
self.model.initializer_extend([q_weight_initializer])
|
|
|
|
return q_weight_name, zp_name, scale_name
|
|
|
|
def quantize_weight_per_channel_impl(
|
|
self,
|
|
weight_name,
|
|
weight_qType,
|
|
channel_axis,
|
|
reduce_range=True,
|
|
keep_float_weight=False,
|
|
):
|
|
initializer = find_by_name(weight_name, self.model.initializer())
|
|
if initializer is None:
|
|
raise ValueError("{} is not an initializer", weight_name)
|
|
|
|
weights = tensor_proto_to_array(initializer)
|
|
weights_rank = len(weights.shape)
|
|
is_axis_valid, axis_norm = normalize_axis(channel_axis, weights_rank)
|
|
if not is_axis_valid:
|
|
raise ValueError(
|
|
f"Weight {weight_name} has a per-channel axis with value {channel_axis} that is "
|
|
f"out-of-bounds for rank {weights_rank}"
|
|
)
|
|
|
|
channel_axis = axis_norm
|
|
channel_count = weights.shape[channel_axis]
|
|
quant_overrides_for_channels = self.tensor_quant_overrides.get_per_channel_overrides(
|
|
weight_name, default_val=[{"axis": channel_axis}]
|
|
)
|
|
|
|
num_channel_overrides = len(quant_overrides_for_channels)
|
|
if num_channel_overrides != 1 and num_channel_overrides != channel_count:
|
|
raise ValueError(
|
|
f"Per-channel tensor quantization overrides for {weight_name} must have "
|
|
f"either 1 or {channel_count} elements in the list of dictionaries."
|
|
)
|
|
|
|
is_axis_override_valid, axis_override = normalize_axis(quant_overrides_for_channels[0]["axis"], weights_rank)
|
|
if not is_axis_override_valid or axis_override != channel_axis:
|
|
raise ValueError(
|
|
f"Tensor quantization overrides for {weight_name} specify an unexpected axis. "
|
|
f"Expected {channel_axis}, but got {quant_overrides_for_channels[0]['axis']}."
|
|
)
|
|
|
|
# If user provides per-channel quantization overrides, all channels must use the same quant_type,
|
|
# axis, symmetric, and reduce_range values. So, just use the first channel's values.
|
|
if "quant_type" in quant_overrides_for_channels[0]:
|
|
weight_qType = quant_overrides_for_channels[0]["quant_type"].tensor_type # noqa: N806
|
|
|
|
symmetric = quant_overrides_for_channels[0].get(
|
|
"symmetric",
|
|
(
|
|
self.is_weight_symmetric
|
|
or weight_qType in (onnx.TensorProto.INT8, onnx.TensorProto.FLOAT8E4M3FN, onnx.TensorProto.INT4)
|
|
),
|
|
)
|
|
reduce_range = quant_overrides_for_channels[0].get("reduce_range", self.reduce_range and reduce_range)
|
|
zero_point_list = []
|
|
scale_list = []
|
|
quantized_per_channel_data_list = []
|
|
for i in range(channel_count):
|
|
per_channel_data = weights.take(i, channel_axis)
|
|
channel_override_index = i if i < num_channel_overrides else 0
|
|
channel_quant_overrides = quant_overrides_for_channels[channel_override_index]
|
|
|
|
if "scale" in channel_quant_overrides and "zero_point" in channel_quant_overrides:
|
|
zero_point = np.array(channel_quant_overrides["zero_point"], dtype=ONNX_TYPE_TO_NP_TYPE[weight_qType])
|
|
scale = np.array(channel_quant_overrides["scale"])
|
|
quantized_per_channel_data = quantize_nparray(
|
|
weight_qType, per_channel_data.flatten(), scale, zero_point
|
|
)
|
|
assert isinstance(zero_point, np.ndarray), f"Unexpected type {type(zero_point)}"
|
|
assert (
|
|
zero_point.dtype != np.float32 and zero_point.dtype != np.float16
|
|
), f"Unexpected dtype {zero_point.dtype}"
|
|
assert isinstance(scale, np.ndarray), f"Unexpected type {type(scale)}"
|
|
assert isinstance(
|
|
quantized_per_channel_data, np.ndarray
|
|
), f"Unexpected type {type(quantized_per_channel_data)}"
|
|
|
|
else:
|
|
_, _, zero_point, scale, quantized_per_channel_data = quantize_data(
|
|
per_channel_data.flatten(),
|
|
weight_qType,
|
|
symmetric,
|
|
reduce_range=reduce_range,
|
|
min_real_range=self.min_real_range,
|
|
rmin_override=channel_quant_overrides.get("rmin"),
|
|
rmax_override=channel_quant_overrides.get("rmax"),
|
|
)
|
|
|
|
assert isinstance(zero_point, np.ndarray), f"Unexpected type {type(zero_point)}"
|
|
assert (
|
|
zero_point.dtype != np.float32 and zero_point.dtype != np.float16
|
|
), f"Unexpected dtype {zero_point.dtype}"
|
|
assert isinstance(scale, np.ndarray), f"Unexpected type {type(scale)}"
|
|
assert isinstance(
|
|
quantized_per_channel_data, np.ndarray
|
|
), f"Unexpected type {type(quantized_per_channel_data)}"
|
|
|
|
zero_point_list.append(zero_point)
|
|
scale_list.append(scale)
|
|
quantized_per_channel_data_list.append(quantized_per_channel_data)
|
|
|
|
# combine per_channel_data into one
|
|
weights_shape = list(weights.shape)
|
|
reshape_dims = list(weights_shape) # deep copy
|
|
reshape_dims[channel_axis] = 1 # only one per channel for reshape
|
|
quantized_weights = np.asarray(quantized_per_channel_data_list[0]).reshape(reshape_dims)
|
|
for i in range(1, len(quantized_per_channel_data_list)):
|
|
channel_weights = np.asarray(quantized_per_channel_data_list[i]).reshape(reshape_dims)
|
|
quantized_weights = np.concatenate((quantized_weights, channel_weights), channel_axis)
|
|
|
|
q_weight_name = weight_name + TENSOR_NAME_QUANT_SUFFIX
|
|
zp_name = weight_name + "_zero_point"
|
|
scale_name = weight_name + "_scale"
|
|
|
|
# Update packed weight, zero point, and scale initializers
|
|
zero_scale_shape = [initializer.dims[channel_axis]]
|
|
scale_initializer = onnx.helper.make_tensor(
|
|
scale_name, initializer.data_type, zero_scale_shape, np.hstack(scale_list).tolist()
|
|
)
|
|
zero_initializer = onnx.helper.make_tensor(
|
|
zp_name, weight_qType, zero_scale_shape, np.hstack(zero_point_list).tolist()
|
|
)
|
|
|
|
self.model.initializer_extend([scale_initializer, zero_initializer])
|
|
|
|
if not keep_float_weight:
|
|
if weight_qType in (onnx.TensorProto.INT4, onnx.TensorProto.UINT4):
|
|
if quantized_weights.dtype not in (np.int8, np.uint8):
|
|
raise RuntimeError(
|
|
f"Quantized weights for {q_weight_name} must be 8-bit before packing as 4-bit values."
|
|
)
|
|
|
|
# We do not use onnx.helper.pack_float32_to_4bit() due to performance.
|
|
# This can be the difference between a large model taking 30 minutes to quantize vs 5 minutes.
|
|
packed_data = bytes(pack_bytes_to_4bit(quantized_weights.tobytes()))
|
|
|
|
# We only use onnx.helper.make_tensor with raw data due to bug: https://github.com/onnx/onnx/pull/6161
|
|
q_weight_initializer = onnx.helper.make_tensor(
|
|
q_weight_name, weight_qType, weights_shape, packed_data, raw=True
|
|
)
|
|
self.model.initializer_extend([q_weight_initializer])
|
|
else:
|
|
quantized_weights = np.asarray(
|
|
quantized_weights,
|
|
dtype=onnx.helper.tensor_dtype_to_np_dtype(weight_qType),
|
|
).reshape(initializer.dims)
|
|
q_weight_initializer = onnx.numpy_helper.from_array(quantized_weights, q_weight_name)
|
|
self.model.initializer_extend([q_weight_initializer])
|
|
|
|
return q_weight_name, zp_name, scale_name
|
|
|
|
def adjust_tensor_ranges(self):
|
|
if self.tensors_range is None:
|
|
return
|
|
|
|
for node in self.model.nodes():
|
|
# adjust tensor_ranges for input of Clip and Relu node
|
|
if node.op_type in ["Clip", "Relu"]:
|
|
if self.is_activation_symmetric:
|
|
continue
|
|
if not self.should_quantize_node(node):
|
|
continue
|
|
if len(self.model.input_name_to_nodes()[node.input[0]]) != 1:
|
|
continue
|
|
if node.input[0] not in self.tensors_range or node.output[0] not in self.tensors_range:
|
|
continue
|
|
td = self.tensors_range[node.output[0]]
|
|
if not isinstance(td, TensorData):
|
|
raise TypeError(f"Unexpected type {type(td)} for {node.output[0]!r}.")
|
|
self.tensors_range[node.input[0]] = td
|
|
# Adjust Softmax to range from 0.0 to 1.0
|
|
elif node.op_type == "Softmax":
|
|
self.tensors_range[node.output[0]] = TensorData(lowest=np.float32(0.0), highest=np.float32(1.0))
|