程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Dbpack enables Python microservices to coordinate distributed transactions

編輯:Python

author : Zhu Han China electronic cloud

What is distributed transaction

Transaction processing is involved in almost every information system , The significance of its existence is to ensure that the system data meets the expectations , And there will be no contradiction between the interrelated data , That is, the consistency of data state .

According to the classical theory of database , Atomicity 、 Isolation, 、 persistence . Atomicity requires that data be either modified or rolled back , Isolation requires that transactions be independent of each other and do not affect , Persistence requires that the execution of transactions can be persisted correctly , No data loss .mysql The row database of the class passes mvcc Multi version views and wal Collaboration of technologies such as pre writing logs , It realizes the atomicity of multiple transactions in the scenario where a single service uses a single data source or a single service uses multiple data sources 、 Isolation and persistence .

Phoenix architecture is described in this book , A single service uses a single data source called a local transaction , A single service that uses multiple data sources is called a global transaction , Distributed transaction refers to a transaction processing mechanism in which multiple services access multiple data sources at the same time .

DBpack brief introduction

There are many ways to implement distributed transactions , Such as reliability transaction queue ,TCC Business ,SAGA Affairs, etc. .

Reliability transaction queue , That is to say BASE, It sounds very consistent ACID," acid-base " Is out of step , It originated as the concept of ultimate consistency , This paper systematically summarizes a technical means for distributed transactions .

TCC It's more complicated , As the name suggests , It is divided into the following three stages .

  • Try: Try the execution phase , Complete all business enforceability checks ( Guarantee consistency ), And reserve all required business resources ( Ensure isolation ).
  • Confirm: Confirm the execution phase , Do not conduct any business checks , Use it directly Try Stage prepares resources to complete business processing .Confirm Phases may be repeated , Therefore, the operations performed in this stage need to be idempotent .
  • Cancel: Cancel the execution phase , Release Try Business resources reserved in the stage .Cancel Phases may be repeated , You also need to satisfy idempotency .

SAGA Transaction splits the transaction , A large transaction splits several small transactions , The entire distributed transaction T Decompose into n Sub transactions , At the same time, corresponding compensation actions are designed for each sub transaction . Although compensation operations are usually easier to implement than freezing or revoking , But the guarantee is positive 、 The reverse recovery process can be carried out rigorously and takes a lot of time .

DBPack Distributed transaction is committed to realizing no intrusion to users' business , Use the popular sidecar framework , The main use of ETCD Watch Mechanism to drive distributed transaction commit rollback , It's right HTTP Traffic and MYSQL The traffic is intercepted by the agent , Support AT Pattern ( Automatic compensation SQL) and TCC Pattern ( Automatic compensation HTTP request ).

DBPack Of AT The mode performance depends on the release speed of the global lock , Which transaction competes for the global lock can modify the business data , In unit time , The faster the global lock is released , The more transactions compete for locks , The higher the performance .DBPack Create a global transaction 、 Registering branch transactions is just ETCD Insert two KV data , Modify the status of the corresponding data when the transaction is committed and rolled back , adopt ETCD Watch When the mechanism senses the change of data, it can immediately process the submission and rollback of data , Thus, the interaction is reduced a lot RPC request .

DBPack Of TCC In the pattern , The request will arrive first sidecar Register after TCC Branch of business , Make sure Prepare Precede Cancel perform . Operation specific business data , It is recommended to use XID and BranchID Lock .

DBpack Empower python Microservices

The foreplay above has been paved , The following text will explain python The microservice code is mainly used , Don't involve dbpack Source code , If you are interested in children's shoes, you can debug them by yourself .

https://github.com/CECTC/dbpack-samples/blob/main/python

Three micro services will be mentioned here , The first is the initiator of the transaction , The second is the order system , Finally, the product inventory system . And every micro service , All use dbpack agent . After the transaction initiator request succeeds , When the order is normal commit after , The product inventory shall be deducted normally , Once a micro service is not completed , The other is subject to rollback , in other words , The two microservices should be consistent .

First , Simulate the service of distributed transaction initiator , The service will register two handler, A will initiate a normal request , go dbpack Agents initiate distributed transactions , The other will return abnormally . The transaction initiator will initiate the transaction according to http Requests for , Decide whether to initiate a distributed transaction rollback .

The following borrows flask web The framework implements two functions of the transaction initiator handler, Through two http We can simulate distributed transaction initiation or rollback .

from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
create_so_url = "http://order-svc:3001/createSo"
update_inventory_url = "http://product-svc:3002/allocateInventory"
@app.route('/v1/order/create', methods=['POST'])
def create_1():
return create_so(rollback=False)
@app.route('/v1/order/create2', methods=['POST'])
def create_2():
return create_so(rollback=True)
def create_so(rollback=True):
xid = request.headers.get("x-dbpack-xid")
so_items = [dict(
product_sysno=1,
product_name="apple iphone 13",
original_price=6799,
cost_price=6799,
deal_price=6799,
quantity=2,
)]
so_master = [dict(
buyer_user_sysno = 10001,
seller_company_code = "SC001",
receive_division_sysno = 110105,
receive_address = "beijing",
receive_zip = "000001",
receive_contact = "scott",
receive_contact_phone = "18728828296",
stock_sysno = 1,
payment_type = 1,
so_amt = 6999 * 2,
status = 10,
appid = "dk-order",
so_items = so_items,
)]
success = (jsonify(dict(success=True, message="success")), 200)
failed = (jsonify(dict(success=False, message="failed")), 400)
headers = {
"Content-Type": "application/json",
"xid": xid
}
so_req = dict(req=so_master)
resp1 = requests.post(create_so_url, headers=headers, json=so_req)
if resp1.status_code == 400:
return failed
ivt_req = dict(req=[dict(product_sysno= 1, qty=2)])
resp2 = requests.post(update_inventory_url, headers=headers, json=ivt_req)
if resp2.status_code == 400:
return failed
if rollback:
print("rollback")
return failed
return success
if __name__ == "__main__":
app.run(host="0.0.0.0", port=3000)

So how to use dbpack Acting for this service ?

$./dist/dbpack start --config ../dbpack-samples/configs/config-aggregation.yaml
$ cat ../dbpack-samples/configs/config-aggregation.yaml
listeners:
- protocol_type: http
socket_address:
address: 0.0.0.0
port: 13000
config:
backend_host: aggregation-svc:3000
filters:
- httpDTFilter
filters:
- name: httpDTFilter
kind: HttpDistributedTransaction
conf:
appid: aggregationSvc
transaction_infos:
- request_path: "/v1/order/create"
timeout: 60000
- request_path: "/v1/order/create2"
timeout: 60000
distributed_transaction:
appid: aggregationSvc
retry_dead_threshold: 130000
rollback_retry_timeout_unlock_enable: true
etcd_config:
endpoints:
- etcd:2379

As one can imagine , The above two microservices handler It's through filters The definitions in this section are used to configure the intercepted .

Then there is the order system .

from flask import Flask, jsonify, request
from datetime import datetime
import mysql.connector
import time
import random
app = Flask(__name__)
insert_so_master = "INSERT /*+ XID('{xid}') */ INTO order.so_master({keys}) VALUES ({placeholders})"
insert_so_item = "INSERT /*+ XID('{xid}') */ INTO order.so_item({keys}) VALUES ({placeholders})"
def conn():
retry = 0
while retry < 3:
time.sleep(5)
try:
c = mysql.connector.connect(
host="dbpack3",
port=13308,
user="dksl",
password="123456",
database="order",
autocommit=True,
)
if c.is_connected():
db_Info = c.get_server_info()
print("Connected to MySQL Server version ", db_Info)
return c
except Exception as e:
print(e.args)
retry += 1
connection = conn()
cursor = connection.cursor(prepared=True,)
@app.route('/createSo', methods=['POST'])
def create_so():
xid = request.headers.get('xid')
reqs = request.get_json()
if xid and "req" in reqs:
for res in reqs["req"]:
res["sysno"] = next_id()
res["so_id"] = res["sysno"]
res["order_date"] = datetime.now()
res_keys = [str(k) for k,v in res.items() if k != "so_items" and str(v) != ""]
so_master = insert_so_master.format(
xid=xid,
keys=", ".join(res_keys),
placeholders=", ".join(["%s"] * len(res_keys)),
)
try:
cursor.execute(so_master, tuple(res.get(k, "") for k in res_keys))
except Exception as e:
print(e.args)
so_items = res["so_items"]
for item in so_items:
item["sysno"] = next_id()
item["so_sysno"] = res["sysno"]
item_keys = [str(k) for k,v in item.items() if str(v) != "" ]
so_item = insert_so_item.format(
xid=xid,
keys=", ".join(item_keys),
placeholders=", ".join(["%s"] * len(item_keys)),
)
try:
cursor.execute(so_item, tuple(item.get(k, "") for k in item_keys))
except Exception as e:
print(e.args)
return jsonify(dict(success=True, message="success")), 200
return jsonify(dict(success=False, message="failed")), 400
def next_id():
return random.randrange(0, 9223372036854775807)
if __name__ == '__main__':
app.run(host="0.0.0.0", port=3001)

be aware sql In the form of annotation, the xid , It is mainly convenient to cooperate dbpack Make corresponding distributed transaction processing after identification , That is, rollback or commit.

Here the database connection uses autocommit This way, . meanwhile , Use python Medium mysql.connector This lib To support sql Two stage encryption transmission protocol in transmission , See... Declared in the code prepared=true.

Use the following command , Use dbpack agent order Microservices :

./dist/dbpack start --config ../dbpack-samples/configs/config-order.yaml

Finally, the product inventory system , The detailed code is as follows :

from flask import Flask, jsonify, request
import time
import mysql.connector
app = Flask(__name__)
allocate_inventory_sql = "update /*+ XID('{xid}') */ product.inventory set available_qty = available_qty - %s, allocated_qty = allocated_qty + %s where product_sysno = %s and available_qty >= %s;"
def conn():
retry = 0
while retry < 3:
time.sleep(5)
try:
c = mysql.connector.connect(
host="dbpack2",
port=13307,
user="dksl",
password="123456",
database="product",
autocommit=True,
)
if c.is_connected():
db_Info = c.get_server_info()
print("Connected to MySQL Server version ", db_Info)
return c
except Exception as e:
print(e.args)
retry += 1
connection = conn()
cursor = connection.cursor(prepared=True,)
@app.route('/allocateInventory', methods=['POST'])
def create_so():
xid = request.headers.get('xid')
reqs = request.get_json()
if xid and "req" in reqs:
for res in reqs["req"]:
try:
cursor.execute(allocate_inventory_sql.format(xid=xid), (res["qty"], res["qty"], res["product_sysno"], res["qty"],))
except Exception as e:
print(e.args)
return jsonify(dict(success=True, message="success")), 200
return jsonify(dict(success=False, message="failed")), 400
if __name__ == '__main__':
app.run(host="0.0.0.0", port=3002)

Again , Use the following command to dbpack agent product Microservices :

./dist/dbpack start --config ../dbpack-samples/configs/config-product.yaml

We can use docker-compose Pull up the above three microservices with one click :

docker-compose up

Under normal circumstances , The following requests will trigger the normal operation of the order system and the product inventory system commit:

curl -XPOST http://localhost:13000/v1/order/create

Although the following commands normally request the order system and product inventory API, Whether or not the transaction is executed normally , Because the status code of the transaction initiator is abnormal , requirement " Roll back ", So it will lead to commit The microservice of is rolled back , So as to ensure the consistency of the distributed system :

curl -XPOST http://localhost:13000/v1/order/create2

Reference material

  • Official warehouse :
    https://github.com/CECTC/dbpack
    https://github.com/CECTC/dbpack-samples
    https://cectc.github.io/dbpack-doc/#/
  • Phoenix Architecture :http://icyfenix.cn/architect-perspective/general-architecture/transaction/distributed.html

  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved