1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
| from datetime import datetime from Crypto.PublicKey import RSA from Crypto.Signature import PKCS1_v1_5 from Crypto.Hash import SHA256 from urllib.parse import quote_plus from urllib.parse import urlparse, parse_qs from base64 import decodebytes, encodebytes import json import requests
class AliPay(object): """ 支付宝支付接口(PC端支付接口) """
def __init__(self, appid, app_notify_url, app_private_key_path, alipay_public_key_path, return_url, debug=False): self.appid = appid self.app_notify_url = app_notify_url self.app_private_key_path = app_private_key_path self.app_private_key = None self.return_url = return_url with open(self.app_private_key_path) as fp: self.app_private_key = RSA.importKey(fp.read()) self.alipay_public_key_path = alipay_public_key_path with open(self.alipay_public_key_path) as fp: self.alipay_public_key = RSA.importKey(fp.read())
if debug is True: self.__gateway = "https://openapi.alipaydev.com/gateway.do" else: self.__gateway = "https://openapi.alipay.com/gateway.do"
def direct_pay(self, subject, out_trade_no, total_amount, return_url=None, **kwargs): biz_content = { "subject": subject, "out_trade_no": out_trade_no, "total_amount": total_amount, "product_code": "FAST_INSTANT_TRADE_PAY", # "qr_pay_mode":4 }
biz_content.update(kwargs) data = self.build_body("alipay.trade.page.pay", biz_content, self.return_url) return self.sign_data(data)
def build_body(self, method, biz_content, return_url=None): data = { "app_id": self.appid, "method": method, "charset": "utf-8", "sign_type": "RSA2", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "version": "1.0", "biz_content": biz_content }
if return_url is not None: data["notify_url"] = self.app_notify_url data["return_url"] = self.return_url
return data
def sign_data(self, data): data.pop("sign", None) # 排序后的字符串 unsigned_items = self.ordered_data(data) unsigned_string = "&".join("{0}={1}".format(k, v) for k, v in unsigned_items) sign = self.sign(unsigned_string.encode("utf-8")) # ordered_items = self.ordered_data(data) quoted_string = "&".join("{0}={1}".format(k, quote_plus(v)) for k, v in unsigned_items)
# 获得最终的订单信息字符串 signed_string = quoted_string + "&sign=" + quote_plus(sign) return signed_string
def ordered_data(self, data): complex_keys = [] for key, value in data.items(): if isinstance(value, dict): complex_keys.append(key)
# 将字典类型的数据dump出来 for key in complex_keys: data[key] = json.dumps(data[key], separators=(',', ':'))
return sorted([(k, v) for k, v in data.items()])
def sign(self, unsigned_string): # 开始计算签名 key = self.app_private_key signer = PKCS1_v1_5.new(key) signature = signer.sign(SHA256.new(unsigned_string)) # base64 编码,转换为unicode表示并移除回车 sign = encodebytes(signature).decode("utf8").replace("\n", "") return sign
def _verify(self, raw_content, signature): # 开始计算签名 key = self.alipay_public_key signer = PKCS1_v1_5.new(key) digest = SHA256.new() digest.update(raw_content.encode("utf8")) if signer.verify(digest, decodebytes(signature.encode("utf8"))): return True return False
def verify(self, data, signature): if "sign_type" in data: sign_type = data.pop("sign_type") # 排序后的字符串 unsigned_items = self.ordered_data(data) message = "&".join(u"{}={}".format(k, v) for k, v in unsigned_items) return self._verify(message, signature)
#请求退款接口 def api_alipay_trade_refund(self,refund_amount,out_trade_no=None,trade_no=None,**kwargs):
#构造参数体 biz_content = { "refund_amount":refund_amount}
#传递可选参数 biz_content.update(**kwargs)
#判断使用站外订单还是支付宝订单 if out_trade_no: biz_content["out_trade_no"] = out_trade_no if trade_no: biz_content["trade_no"] = trade_no
#构造支付接口地址 data = self.build_body("alipay.trade.refund",biz_content)
#构造url url = self.__gateway+"?" + self.sign_data(data)
#请求接口 r = requests.get(url) html = r.content.decode("utf-8")
return html
|