259 lines
9.7 KiB
Python
259 lines
9.7 KiB
Python
import numpy as np
|
|
import onnx
|
|
from onnx import onnx_pb as onnx_proto
|
|
|
|
from ..quant_utils import (
|
|
TENSOR_NAME_QUANT_SUFFIX,
|
|
QuantizedValue,
|
|
QuantizedValueType,
|
|
attribute_to_kwarg,
|
|
find_by_name,
|
|
get_mul_node,
|
|
)
|
|
from .base_operator import QuantOperatorBase
|
|
from .qdq_base_operator import QDQOperatorBase
|
|
|
|
|
|
class ConvInteger(QuantOperatorBase):
|
|
def __init__(self, onnx_quantizer, onnx_node):
|
|
super().__init__(onnx_quantizer, onnx_node)
|
|
|
|
def add_bias(self, nodes, scaled_output):
|
|
"""
|
|
Given a node, this function handles bias add by adding a "reshape" node on bias and an "add" node
|
|
parameter nodes: new nodes would be appended into nodes
|
|
parameter node: current node (Conv)
|
|
parameter scaled_output: output of quant conv without bias
|
|
parameter output: output of Conv
|
|
parameter bias_name: bias of Conv
|
|
return: the name of output
|
|
"""
|
|
node = self.node
|
|
model = self.quantizer.model
|
|
# Add tensors for the shape to be reshaped to
|
|
weight = find_by_name(node.input[1], model.initializer())
|
|
if weight is None:
|
|
raise ValueError(f"Expected {node.input[1]} to be an initializer")
|
|
|
|
# Add reshape for correct broadcase
|
|
output = node.output[0]
|
|
reshape_input_data = node.input[2] # bias of Conv
|
|
reshape_input_shape = output + "_bias_reshape_shape"
|
|
reshape_output = output + "_bias_reshape_output"
|
|
|
|
shape = np.ones((len(weight.dims)), dtype=np.int64)
|
|
shape[1] = -1
|
|
init_shape = onnx.helper.make_tensor(
|
|
reshape_input_shape, onnx_proto.TensorProto.INT64, [len(weight.dims)], shape
|
|
)
|
|
model.add_initializer(init_shape)
|
|
|
|
reshape_node = onnx.helper.make_node("Reshape", [reshape_input_data, reshape_input_shape], [reshape_output])
|
|
nodes.append(reshape_node)
|
|
|
|
# Add an Add operation for bias
|
|
add_node = onnx.helper.make_node("Add", [scaled_output, reshape_output], [output], output + "_bias_add")
|
|
nodes.append(add_node)
|
|
|
|
def quantize(self):
|
|
node = self.node
|
|
assert node.op_type == "Conv"
|
|
# Get Quantized from both activation(input[0]) and weight(input[1])
|
|
(
|
|
quantized_input_names,
|
|
zero_point_names,
|
|
scale_names,
|
|
nodes,
|
|
) = self.quantizer.quantize_activation(node, [0])
|
|
|
|
(
|
|
quantized_input_names_weight,
|
|
zero_point_names_weight,
|
|
scale_names_weight,
|
|
nodes_weight,
|
|
) = self.quantizer.quantize_weight(node, [1], reduce_range=self.quantizer.reduce_range)
|
|
quantized_input_names.extend(quantized_input_names_weight)
|
|
zero_point_names.extend(zero_point_names_weight)
|
|
scale_names.extend(scale_names_weight)
|
|
nodes.extend(nodes_weight)
|
|
|
|
conv_integer_output = node.output[0] + "_output_quantized"
|
|
conv_integer_name = node.name + "_quant" if node.name else ""
|
|
|
|
kwargs = {}
|
|
for attribute in node.attribute:
|
|
kwargs.update(attribute_to_kwarg(attribute))
|
|
conv_integer_node = onnx.helper.make_node(
|
|
"ConvInteger", quantized_input_names + zero_point_names, [conv_integer_output], conv_integer_name, **kwargs
|
|
)
|
|
nodes.append(conv_integer_node)
|
|
|
|
# Add cast operation to cast convInteger output to float.
|
|
onnx_type = self.quantizer.get_tensor_type(node.output[0], mandatory=True)
|
|
cast_op_output = conv_integer_output + "_cast_output"
|
|
cast_node = onnx.helper.make_node(
|
|
"Cast",
|
|
[conv_integer_output],
|
|
[cast_op_output],
|
|
conv_integer_output + "_cast",
|
|
to=onnx_type, # TODO: FLOAT ot FLOAT16
|
|
)
|
|
nodes.append(cast_node)
|
|
|
|
# Add mul operation to multiply scales of two inputs.
|
|
assert len(scale_names) == 2
|
|
if conv_integer_name:
|
|
scales_mul_op = conv_integer_name + "_scales_mul"
|
|
else:
|
|
scales_mul_op = scale_names[0] + "_" + scale_names[1] + "_mul"
|
|
|
|
scales_mul_node = find_by_name(scales_mul_op, self.quantizer.new_nodes)
|
|
if scales_mul_node is None:
|
|
scales_mul_node = get_mul_node(scale_names, scales_mul_op + ":0", scales_mul_op)
|
|
nodes.append(scales_mul_node)
|
|
|
|
scales_mul_op_output = scales_mul_node.output[0]
|
|
|
|
has_bias = len(node.input) == 3
|
|
scaled_output_name = node.output[0] if not has_bias else node.output[0] + "quant_scaled_output"
|
|
|
|
# Add mul operation to multiply mul_scales_op result with output of ConvInteger
|
|
# and make the output of this node the same as output of original conv node.
|
|
output_scale_mul_op = conv_integer_name + "_output_scale_mul" if conv_integer_name else ""
|
|
nodes.append(
|
|
get_mul_node(
|
|
[cast_op_output, scales_mul_op_output],
|
|
scaled_output_name,
|
|
output_scale_mul_op,
|
|
)
|
|
)
|
|
|
|
if has_bias:
|
|
self.add_bias(nodes, scaled_output_name)
|
|
|
|
self.quantizer.new_nodes += nodes
|
|
|
|
|
|
class QLinearConv(QuantOperatorBase):
|
|
def __init__(self, onnx_quantizer, onnx_node):
|
|
super().__init__(onnx_quantizer, onnx_node)
|
|
|
|
def quantize(self):
|
|
node = self.node
|
|
assert node.op_type == "Conv"
|
|
|
|
(
|
|
data_found,
|
|
output_scale_name,
|
|
output_zp_name,
|
|
_,
|
|
_,
|
|
) = self.quantizer._get_quantization_params(node.output[0])
|
|
|
|
if self.quantizer.is_input_a_initializer(node.input[1]) and self.quantizer.is_per_channel():
|
|
(
|
|
quantized_input_names,
|
|
zero_point_names,
|
|
scale_names,
|
|
nodes,
|
|
) = self.quantizer.quantize_activation(node, [0])
|
|
quant_weight_tuple = self.quantizer.quantize_weight_per_channel(
|
|
node.input[1], onnx_proto.TensorProto.INT8, 0 # self.quantizer.weight_qType?
|
|
)
|
|
quantized_input_names.append(quant_weight_tuple[0])
|
|
zero_point_names.append(quant_weight_tuple[1])
|
|
scale_names.append(quant_weight_tuple[2])
|
|
else:
|
|
(
|
|
quantized_input_names,
|
|
zero_point_names,
|
|
scale_names,
|
|
nodes,
|
|
) = self.quantizer.quantize_activation(node, [0])
|
|
|
|
(
|
|
quantized_input_names_weight,
|
|
zero_point_names_weight,
|
|
scale_names_weight,
|
|
nodes_weight,
|
|
) = self.quantizer.quantize_weight(node, [1], reduce_range=self.quantizer.reduce_range)
|
|
quantized_input_names.extend(quantized_input_names_weight)
|
|
zero_point_names.extend(zero_point_names_weight)
|
|
scale_names.extend(scale_names_weight)
|
|
nodes.extend(nodes_weight)
|
|
|
|
if not data_found or quantized_input_names is None:
|
|
return super().quantize()
|
|
|
|
quantized_bias_name = ""
|
|
bias_present = False
|
|
if len(node.input) == 3:
|
|
if self.quantizer.weight_qType == onnx_proto.TensorProto.FLOAT8E4M3FN:
|
|
raise RuntimeError("Quantization to FLOAT8E4M3FN for operator Conv is not supported.")
|
|
quantized_bias_name = self.quantizer.quantize_bias_static(node.input[2], node.input[0], node.input[1])
|
|
bias_present = True
|
|
|
|
qlinear_conv_output = node.output[0] + TENSOR_NAME_QUANT_SUFFIX
|
|
qlinear_conv_name = node.name + "_quant" if node.name else ""
|
|
|
|
kwargs = {}
|
|
for attribute in node.attribute:
|
|
kwargs.update(attribute_to_kwarg(attribute))
|
|
qlinear_conv_inputs = []
|
|
# Input 0
|
|
qlinear_conv_inputs.append(quantized_input_names[0])
|
|
qlinear_conv_inputs.append(scale_names[0])
|
|
qlinear_conv_inputs.append(zero_point_names[0])
|
|
# Input 1
|
|
qlinear_conv_inputs.append(quantized_input_names[1])
|
|
qlinear_conv_inputs.append(scale_names[1])
|
|
qlinear_conv_inputs.append(zero_point_names[1])
|
|
|
|
# Output
|
|
qlinear_conv_inputs.append(output_scale_name)
|
|
qlinear_conv_inputs.append(output_zp_name)
|
|
|
|
if bias_present:
|
|
qlinear_conv_inputs.append(quantized_bias_name)
|
|
|
|
qlinear_conv_node = onnx.helper.make_node(
|
|
"QLinearConv", qlinear_conv_inputs, [qlinear_conv_output], qlinear_conv_name, **kwargs
|
|
)
|
|
nodes.append(qlinear_conv_node)
|
|
|
|
# Create an entry for this quantized value
|
|
q_output = QuantizedValue(
|
|
node.output[0],
|
|
qlinear_conv_output,
|
|
output_scale_name,
|
|
output_zp_name,
|
|
QuantizedValueType.Input,
|
|
)
|
|
self.quantizer.quantized_value_map[node.output[0]] = q_output
|
|
|
|
self.quantizer.new_nodes += nodes
|
|
|
|
|
|
class QDQConv(QDQOperatorBase):
|
|
def __init__(self, onnx_quantizer, onnx_node):
|
|
super().__init__(onnx_quantizer, onnx_node)
|
|
|
|
def quantize(self):
|
|
node = self.node
|
|
assert node.op_type == "Conv" or node.op_type == "ConvTranspose"
|
|
|
|
self.quantizer.quantize_activation_tensor(node.input[0])
|
|
if not self.disable_qdq_for_node_output:
|
|
self.quantizer.quantize_activation_tensor(node.output[0])
|
|
|
|
is_weight_per_channel, weight_axis = self.quantizer.is_tensor_per_channel(
|
|
node.input[1], default_axis=0 if node.op_type == "Conv" else 1
|
|
)
|
|
if is_weight_per_channel:
|
|
self.quantizer.quantize_weight_tensor_per_channel(node.input[1], weight_axis)
|
|
else:
|
|
self.quantizer.quantize_weight_tensor(node.input[1])
|
|
|
|
if len(node.input) == 3:
|
|
self.quantizer.quantize_bias_tensor(node.name, node.input[2], node.input[0], node.input[1])
|