程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Python GRP practice (7) -- error passing of GRP

編輯:Python

Keep creating , Accelerate growth ! This is my participation 「 Nuggets day new plan · 6 Yuegengwen challenge 」 Of the 4 God , Click to see the event details

Preface

Before in the article 《Python-gRPC practice (3)-- Use Python Realization gRPC service 》 The implementation of gRPC The service uses a set of custom protocols to pass errors , But this is not an elegant solution , Because the compatibility of this scheme is very poor , Fortunately, the official definition of a solution , This scheme enables different services to deliver errors .

1. Custom error delivery

In writing ordinary HTTP/1.1 Interface , We all customize a set of business-related errors to HTTP The marked error is distinguished , For example, such a structure is usually returned :

{
"code": "0",
"msg": "success",
"data": {}
}

This structure contains code,msg and data Three fields , They are error codes , error message , And the structure to return . After the client receives the response , Will judge code What is the value of , If it belongs to the defined success status code, it passes data Extract the data , Otherwise msg Information is thrown out through exceptions .

In the use of gRPC Is no exception , Because we're using gRPC Invocation time , It's like calling a normal function , however gRPC Services are delivered through message Data to interact with , Request per call message And response message Have been fixed , If we want to return an error message , Then it must be different from the response structure , So the structure of the error message must match the response body , Otherwise, we can only find another way , For example, the error information field is embedded in each response body , as follows :

message Demo {
string a=1;
int32 b=2;
int32 err_code=3;
string err_msg=4;
}

Then the server judges that the call execution is wrong and converts the error to the corresponding err_code and err_msg Then insert into message To the client , And every time the client receives the call response, it will judge err_code If there is a value , There is an exception request , Only err_code and err_msg Extract it to generate an exception and throw it to the caller , Otherwise, the data will be returned normally .

This method is compatible with every call , But not very elegant , If you can transfer data to the client through other protocol containers , The client parses the error information through the corresponding protocol and produces exceptions , What I introduced before gRPC In service , Is to adopt gRPC.metadata To transmit data . At the same time, in order to automatically handle the exception capture of the server and the exception generation of the client , A top-level interceptor will be set on the client and server respectively , The top-level interceptor code of the server is as follows ( Because other interceptors may throw the wrong , Therefore, interceptors that catch errors must be placed at the top level ):

# code url: https://github.com/so1n/grpc-example-common/blob/v0.1.5/grpc_example_common/interceptor/server_interceptor/customer_top.py
import logging
import time
from typing import Any, Callable, List, Tuple
import grpc
from grpc_example_common.helper.context import context_proxy
from .base import BaseInterceptor
class CustomerTopInterceptor(BaseInterceptor):
def intercept( self, next_handler_method: Callable, request_proto_message: Any, context: grpc.ServicerContext, ) -> Any:
return_initial_metadata: List[Tuple] = [("customer-user-agent", "Python3")]
try:
# perform gRPC Call to 
return next_handler_method(request_proto_message, context)
except Exception as e:
# The client is limited to the following Key-Value To set the error message 
if self.metadata_dict.get("customer-user-agent", "") == "Python3":
return_initial_metadata.append(("exc_name", e.__class__.__name__))
return_initial_metadata.append(("exc_info", str(e)))
# Throw an exception , such gRPC The server can catch the corresponding exception , It is convenient for the server to carry out subsequent processing 
raise e
finally:
# End of sending metadata Stream to client 
context.send_initial_metadata(return_initial_metadata)

The interceptor will catch the exception of the call , Then save the exception method name and exception information metedata in , The reason for setting the value to metadata in , Not through context.set_code,context.set_details There is a reason to set the error code and error message .

First of all code,gRPC It limits the setting of the allowed code, So this will limit us to customize code, At the same time, we should not set the business error code into the response error code , So don't use it here context.set_code; And for set_details, It's because gRPC The server will resolve the corresponding exception after catching the exception , Then pass the abnormal data through context.set_details Set to details in , as follows :

def _call_behavior(rpc_event, state, behavior, argument, request_deserializer, send_response_callback=None):
from grpc import _create_servicer_context
with _create_servicer_context(rpc_event, state,
request_deserializer) as context:
try:
response_or_iterator = None
# Call request 
if send_response_callback is not None:
response_or_iterator = behavior(argument, context,
send_response_callback)
else:
response_or_iterator = behavior(argument, context)
return response_or_iterator, True
except Exception as exception: # pylint: disable=broad-except
with state.condition:
if state.aborted:
_abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
b'RPC Aborted')
elif exception not in state.rpc_errors:
# The judgment here does not belong to grpc Error of , The error message will be set to details
details = 'Exception calling application: {}'.format(
exception)
_LOGGER.exception(details)
_abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
_common.encode(details))
return None, False

This means that even if we set the interceptor details, But because the thrown exception does not belong to gRPC It's abnormal , therefore details Finally, it is covered by abnormal information .

Understand the interceptor implementation on the server side , Next, let's look at the interceptor implementation on the client side , The code is as follows :

# code url: https://github.com/so1n/grpc-example-common/blob/v0.1.5/grpc_example_common/interceptor/client_interceptor/customer_top.py
import inspect
import logging
from typing import Any, Callable, Dict, List, Optional, Type
from .base import GRPC_RESPONSE, BaseInterceptor, ClientCallDetailsType
class CustomerTopInterceptor(BaseInterceptor):
def __init__(self, exc_list: Optional[List[Type[Exception]]] = None):
self.exc_dict: Dict[str, Type[Exception]] = {}
for key, exc in globals()["__builtins__"].items():
# register Python It's a built-in anomaly 
if inspect.isclass(exc) and issubclass(exc, Exception):
self.exc_dict[key] = exc
if exc_list:
# Register user specified exceptions 
for exc in exc_list:
if issubclass(exc, Exception):
self.exc_dict[exc.__name__] = exc
def intercept( self, method: Callable, request_or_iterator: Any, call_details: ClientCallDetailsType, ) -> GRPC_RESPONSE:
if call_details.metadata is not None:
# Add contract information 
call_details.metadata.append(("customer-user-agent", "Python3")) # type: ignore
response: GRPC_RESPONSE = method(call_details, request_or_iterator)
metadata_dict: dict = {item.key: item.value for item in response.initial_metadata()}
if metadata_dict.get("customer-user-agent") == "Python3":
# Extract exception information 
exc_name: str = metadata_dict.get("exc_name", "")
exc_info: str = metadata_dict.get("exc_info", "")
# adopt exc_name Look for exceptions 
exc: Optional[Type[Exception]] = self.exc_dict.get(exc_name)
if exc:
# Throw an exception 
raise exc(exc_info)
return response

It can be seen that the client interceptor obtains the information returned by the server metada To determine whether there is abnormal information , If so, extract and throw an error , Otherwise, the response will be returned normally . In this way, as long as the client and server are set with the correct interceptors , The client can get the error information of the server and throw an exception , But this implementation depends on gRPC.metadata The transmission of data , and gRPC.metadata The value of must be ASCII Or canonical bytes , Otherwise, the request will not be transmitted or even stuck , This means that we need to do some serialization of the error information .

2. Implementation of error transmission based on official protocol

Because the above implementation is not very elegant , So I surf the Internet to find an official implementation , At last Github Found in the Official error transmission example , The official server example code is as follows :

def create_greet_limit_exceed_error_status(name):
# Create a Message object 
detail = any_pb2.Any()
# Turn a custom error into a Any The object of , In this way, the verification failure will not occur when sending and receiving messages 
detail.Pack(
error_details_pb2.QuotaFailure(violations=[
error_details_pb2.QuotaFailure.Violation(
subject="name: %s" % name,
description="Limit one greeting per person",
)
],))
# Generate a Status object , This object includes code,message,details Three fields 
return status_pb2.Status(
code=code_pb2.RESOURCE_EXHAUSTED,
message='Request limit exceeded.',
# Error object array 
details=[detail],
)
class LimitedGreeter(helloworld_pb2_grpc.GreeterServicer):
def __init__(self):
self._lock = threading.RLock()
self._greeted = set()
def SayHello(self, request, context):
# Corresponding gRPC call 
with self._lock:
if request.name in self._greeted:
rich_status = create_greet_limit_exceed_error_status(
request.name)
context.abort_with_status(rpc_status.to_status(rich_status))
else:
self._greeted.add(request.name)
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)

The... In the sample code SayHello The method logic is very simple , It judges if name non-existent , Just put name Add to collection , And return to , If it already exists , Then Mr. Cheng becomes Status object , Re pass to_status Method generation One _Status object , Finally through abort_with_stauts Method to _Status Object passing in , In this way, the error data is transmitted to the client .

among abort_with_stauts Method causes the request to throw an exception and terminate in an abnormal state , And then put the user specified Status Object to the client , and to_status The source code is as follows :

def to_status(status):
return _Status(code=code_to_grpc_status_code(status.code),
details=status.message,
trailing_metadata=((GRPC_DETAILS_METADATA_KEY,
status.SerializeToString()),))

From the source code, we can see that this function is to status.code To gRPC Responsive code, hold status.message To gRPC Of details, Finally, put status To a legal string , And pass GRPC_DETAILS_METADATA_KEY Set the string to metadata in .

For the client side, it is relatively simple , Source code is as follows :

def process(stub):
try:
response = stub.SayHello(helloworld_pb2.HelloRequest(name='Alice'))
_LOGGER.info('Call success: %s', response.message)
except grpc.RpcError as rpc_error:
_LOGGER.error('Call failure: %s', rpc_error)
# adopt `grpc.RpcError` extract Status object 
status = rpc_status.from_call(rpc_error)
for detail in status.details:
# Read detail The object in it , And judge whether it is corresponding message, If so, print an error log , If not, throw it wrong 
if detail.Is(error_details_pb2.QuotaFailure.DESCRIPTOR):
info = error_details_pb2.QuotaFailure()
detail.Unpack(info)
_LOGGER.error('Quota failure: %s', info)
else:
raise RuntimeError('Unexpected failure: %s' % detail)

In this code , If it is a normal response , Print the response body , And if it is abnormal , The client will find the response body code Not a normal status code , So I'll throw a grpc.RpcError abnormal , And then through rpc_status.from_call Function extraction exception , The logic of this function is very simple , Source code is as follows :

def from_call(call):
# without metadata The data directly returns null 
if call.trailing_metadata() is None:
return None
# Once there is data, you can traverse the data 
for key, value in call.trailing_metadata():
# If Key Officially designated Key, Enter the data extraction logic 
if key == GRPC_DETAILS_METADATA_KEY:
# Deserialize the data into a message object 
rich_status = status_pb2.Status.FromString(value)
# Verify whether the object data is the same as the response body 
if call.code().value[0] != rich_status.code:
raise ValueError(
'Code in Status proto (%s) doesn\'t match status code (%s)'
% (code_to_grpc_status_code(rich_status.code), call.code()))
if call.details() != rich_status.message:
raise ValueError(
'Message in Status proto (%s) doesn\'t match status details (%s)'
% (rich_status.message, call.details()))
return rich_status
return None

From the source code, we can see that this logic is the same as the custom error passing , through metadata Extract the data and assemble it into an exception object . however , It should be noted that from_call Of call Parameters not only support grpc.RpcError, It also supports the response object , because call Parameter in form_call Used in the trailing_metadata,code and details Methods are grpc.RpcError and response Methods shared by objects .

In a simple understanding of gRPC After passing an example of the error, you can find , The official approach is very similar to custom error passing , It just defines a standard Key, In this way, everyone will think that Key The corresponding value is a Status A string formed by serialization of objects ( Due to serialization , There is no need to worry about the existence of non - ASCII The character problem ). And this Status Object contains code,message and detail Three fields , Corresponding to the above error structure :

{
"code": "0",
"msg": "success",
"data": {}
}

Medium code,msg and data, But here's the thing detail Is an array , It can store multiple customized Message object .

3. Redesign the error passing implementation

It can be found through the official error transmission , This example requires the business logic of the server to actively pass context.abort_with_status Logic to actively set the error message to metadata in , Client side capture is also required grpc.RpcError Print out the exception , This is very verbose for the business layer , So I try to combine the error transmission implementation of the official protocol with the user-defined error transmission .

The first is to define an internally unified message

message Exec{
string name = 1; // Exception names 
string msg = 2; // Abnormal information 
}

This Message Only for internal business services , If the server is developed for other departments , And they are not compatible with this message, They can also pass through code and detail Know what kind of mistake it is .

Then start to toss the top interceptor of the server , The interceptor simply modifies the code that captures the exception part , Source code is as follows :

# code url: https://github.com/so1n/grpc-example-common/blob/v0.1.7/grpc_example_common/interceptor/server_interceptor/customer_top.py
class CustomerTopInterceptor(BaseInterceptor):
def intercept( self, next_handler_method: Callable, request_proto_message: Any, context: grpc.ServicerContext, ) -> Any:
try:
# The service call 
return next_handler_method(request_proto_message, context)
except Exception as e:
# Create a Message object 
detail = any_pb2.Any()
# Turn a custom error into a Any The object of , In this way, the verification failure will not occur when sending and receiving messages 
# It should be noted that , Here is our own definition message.Exec
detail.Pack(
Exec(
name=e.__class__.__name__,
msg=str(e)
)
)
# adopt abort_with_status Pass the data through metadata Pass it to the client 
context.abort_with_status(
rpc_status.to_status(
status_pb2.Status(
code=code_pb2.RESOURCE_EXHAUSTED, # Only fill in here gRPC Error code , For example, we defined the error code of the business as 2001, however HTTP The status code of is still 200 equally 
message=str(e),
details=[detail], # Here is an array , Therefore, multiple sets of abnormal objects can be defined here to be compatible with different systems , However, there is only one set of methods in internal calls 
)
)
)
# Throw an exception , however gRPC The server judges that the call has been marked as abort, It's not going to continue 
# But it is useful for other functions , such as opentelemetry The official implementation of is in channel Apply another channel, So it needs to catch the exception and generate the corresponding Event
raise e

Then we toss the top interceptor of the client , Similarly, it only needs to change the data acquisition , Source code is as follows :

# code url: https://github.com/so1n/grpc-example-common/blob/v0.1.7/grpc_example_common/interceptor/client_interceptor/customer_top.py
class CustomerTopInterceptor(BaseInterceptor):
# Register abnormal band press morgue 
...
def intercept( self, method: Callable, request_or_iterator: Any, call_details: ClientCallDetailsType, ) -> GRPC_RESPONSE:
response: GRPC_RESPONSE = method(call_details, request_or_iterator)
# It was said that `from_call` It also supports the client interceptor `method` Method derived response object 
status: Optional[status_pb2.Status] = rpc_status.from_call(response)
# If not for None, It is proved that the abnormal data is obtained 
if status:
for detail in status.details:
# Judge this detail Is it what we want Message
if detail.Is(Exec.DESCRIPTOR):
# Get data through deserialization 
exec_instance: Exec = Exec()
detail.Unpack(exec_instance)
# Generate an exception and throw 
exec_class: Type[Exception] = self.exc_dict.get(exec_instance.name) or RuntimeError
raise exec_class(exec_instance.msg)
else:
raise RuntimeError('Unexpected failure: %s' % detail)
return response

thus , The new error passing implementation is complete , Now through a simple demo To verify the results ,demo The code is as follows :

# grpc_example_common url:https://github.com/so1n/grpc-example-common/tree/v0.1.7
# Server code 
from concurrent import futures
from typing import List
import grpc
from grpc_example_common.interceptor.server_interceptor.base import BaseInterceptor
from google.protobuf.empty_pb2 import Empty # type: ignore
from grpc_example_common.protos.user import user_pb2 as user_message
from grpc_example_common.interceptor.server_interceptor.customer_top import CustomerTopInterceptor
from grpc_example_common.protos.user import user_pb2_grpc as user_service
class UserService(user_service.UserServicer):
def delete_user(self, request: user_message.DeleteUserRequest, context: grpc.ServicerContext) -> Empty:
uid: str = request.uid
if uid == "123":
return Empty()
else:
raise ValueError(f"Not found user:{uid}")
def main(host: str = "127.0.0.1", port: str = "9000") -> None:
interceptor_list: List[BaseInterceptor] = [CustomerTopInterceptor()]
server: grpc.server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=interceptor_list,
)
user_service.add_UserServicer_to_server(UserService(), server)
server.add_insecure_port(f"{host}:{port}")
server.start()
try:
server.wait_for_termination()
except KeyboardInterrupt:
server.stop(0)
if __name__ == "__main__":
main()
# Client code 
import grpc
from grpc_example_common.protos.user import user_pb2 as user_message
from grpc_example_common.protos.user import user_pb2_grpc as user_service
from grpc_example_common.interceptor.client_interceptor.customer_top import CustomerTopInterceptor
channel: grpc.Channel = grpc.intercept_channel(
grpc.insecure_channel("127.0.0.1:9000"), CustomerTopInterceptor()
)
user_stub: user_service.UserStub = user_service.UserStub(channel)
user_stub.delete_user(user_message.DeleteUserRequest(uid="123"))
user_stub.delete_user(user_message.DeleteUserRequest(uid="456"))

written demo And then start running , After running, the client throws the following error message :

Traceback (most recent call last):
File "/home/so1n/github/grpc-example-project/grpc-example-api-backend-service/demo.py", line 11, in <module>
user_stub.delete_user(user_message.DeleteUserRequest(uid="456"))
File "/home/so1n/github/grpc-example-project/grpc-example-api-backend-service/.venv/lib/python3.8/site-packages/grpc/_interceptor.py", line 216, in __call__
response, ignored_call = self._with_call(request,
File "/home/so1n/github/grpc-example-project/grpc-example-api-backend-service/.venv/lib/python3.8/site-packages/grpc/_interceptor.py", line 254, in _with_call
call = self._interceptor.intercept_unary_unary(continuation,
File "/home/so1n/github/grpc-example-project/grpc-example-api-backend-service/.venv/lib/python3.8/site-packages/grpc_example_common/interceptor/client_interceptor/base.py", line 74, in intercept_unary_unary
return self.intercept(continuation, request, call_details)
File "/home/so1n/github/grpc-example-project/grpc-example-api-backend-service/.venv/lib/python3.8/site-packages/grpc_example_common/interceptor/client_interceptor/customer_top.py", line 44, in intercept
raise exec_class(exec_instance.msg)
ValueError: Not found user:456

Through information, we can find , Redesigned error delivery for perfect operation .


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved