您当前的位置:首页 > 计算机 > 编程开发 > Python

python 使用spyne发布webservice 使用suds调用webservice示例

时间:01-18来源:作者:点击数:

python 使用spyne发布webservice 使用suds调用webservice示例

import logging
import socket
import json
import suds
try:
    import xml.etree.cElementTree as ET
except ImportError:
    import xml.etree.ElementTree as ET
from suds.client import Client
from suds.sax.element import Element
from suds.sax.attribute import Attribute
from spyne.application import Application
from spyne.decorator import rpc
from spyne import ServiceBase, Iterable, Integer, Unicode
from spyne.protocol.soap import Soap11, Soap12
from spyne.server.wsgi import WsgiApplication
from wsgiref.simple_server import make_server
from xml.dom.minidom import Document

class HimapWSManager:
    __himapwsurl = 'http://127.0.0.1:8080/himapws/CallServicePort?wsdl'
    __timeout = 30

    def __CallService(self, sProcessName, sRequestXml):
        client = Client(self.__himapwsurl, timeout=self.__timeout)
        #logging.info('调用流程:%s,请求包:%s' % (sProcessName, sRequestXml))
        result = client.service.callService("", sProcessName, sRequestXml, "admin", "fr4t3t1y6s6s1e0502c4d5")
        #logging.info('调用流程:%s,接收包:%s' % (sProcessName, result))
        return result

    def __TextIsNull(self, textValue, defaultValue=''):
        return textValue if textValue else defaultValue

    def __AddRequestChild(self, doc, parent, child_name, child_value):
        text = doc.createTextNode(child_value)
        child_node = doc.createElement(child_name)
        child_node.appendChild(text)
        parent.appendChild(child_node)

    def __ParseChargingDetailInterChangeXml(self, responsexml):
        try:
            root = ET.fromstring(responsexml)
        except BaseException as e:
            raise BaseException(e.message)
        response = {}
        returncode = root.find('RETURNCODE').text
        codetext = root.find('CODETEXT').text
        if returncode != 'SUCCEED':
            response['Success'] = '0'
            response['ErrorMessage'] = self.__TextIsNull(codetext, '失败')
            response['InvoiceNumber'] = ''
            response['ChargeDate'] = ''
        else:
            response['Success'] = '1'
            response['ErrorMessage'] = self.__TextIsNull(codetext, '成功')
            response['InvoiceNumber'] = self.__TextIsNull(root.find('response/InvoiceNumber').text)
            response['ChargeDate'] = self.__TextIsNull(root.find('response/ChargeDate').text)
        return response

    def __ParseSynchronizeChargingDetailXml(self, responsexml):
        try:
            root = ET.fromstring(responsexml)
        except BaseException as e:
            raise BaseException(e.message)
        response = []
        returncode = root.find('RETURNCODE').text
        codetext = root.find('CODETEXT').text
        if returncode != 'SUCCEED':
            record = {}
            record['Success'] = '0'
            record['ErrorMessage'] = self.__TextIsNull(codetext, '失败')
            response.append(record)
        else:
            for Item in root.findall('response'):
                record = {}
                record['DetailedChargesId'] = self.__TextIsNull(Item.find('DetailedChargesId').text)
                record['HospCode'] = self.__TextIsNull(Item.find('HospCode').text)
                record['His_Keyno'] = self.__TextIsNull(Item.find('His_Keyno').text)
                record['ChargeCode'] = self.__TextIsNull(Item.find('ChargeCode').text)
                record['Success'] = self.__TextIsNull(Item.find('Success').text)
                record['ErrorMessage'] = self.__TextIsNull(Item.find('ErrorMessage').text)
                record['InvoiceNumber'] = self.__TextIsNull(Item.find('InvoiceNumber').text)
                record['ChargeDate'] = self.__TextIsNull(Item.find('ChargeDate').text)
                response.append(record)
        return response

    def CallChargingDetailInterChange(self, sHisId, sApplyNumber, sChargeJson, sApplyJson):
        logging.info('收费请求,HisId:%s,ApplyNumber:%s' % (sHisId, sApplyNumber))
        doc = Document()  # 创建DOM文档对象
        root = doc.createElement('request')  # 创建根元素
        doc.appendChild(root)  # 根节点插入doc树
        self.__AddRequestChild(doc, root, 'sHisId', sHisId)
        self.__AddRequestChild(doc, root, 'sApplyNumber', sApplyNumber)
        self.__AddRequestChild(doc, root, 'sChargeJson', sChargeJson)
        self.__AddRequestChild(doc, root, 'sApplyJson', sApplyJson)
        docxml = doc.toprettyxml()
        requestxml = docxml.replace('<?xml version="1.0" ?>', '')
        # requestxml = '<request><sHisId>?</sHisId><sApplyNumber>?</sApplyNumber><sChargeJson>?</sChargeJson><sApplyJson>?</sApplyJson></request>'
        responsexml = self.__CallService('PROC_PA_CENTER_CHARGE', requestxml)
        responsejson = json.dumps(self.__ParseChargingDetailInterChangeXml(responsexml), ensure_ascii=False)
        logging.info('收费返回,HisId:%s,ApplyNumber:%s,JsonStr:%s' % (sHisId, sApplyNumber, responsejson))
        return responsejson

    def CallSynchronizeChargingDetail(self, sHisId, sApplyNumber, sChargeJson):
        logging.info('同步请求,HisId:%s,ApplyNumber:%s' % (sHisId, sApplyNumber))
        doc = Document()  # 创建DOM文档对象
        root = doc.createElement('request')  # 创建根元素
        doc.appendChild(root)  # 根节点插入doc树
        self.__AddRequestChild(doc, root, 'sHisId', sHisId)
        self.__AddRequestChild(doc, root, 'sApplyNumber', sApplyNumber)
        self.__AddRequestChild(doc, root, 'sChargeJson', sChargeJson)
        docxml = doc.toprettyxml()
        requestxml = docxml.replace('<?xml version="1.0" ?>', '')
        # requestxml = '<request><sHisId>?</sHisId><sApplyNumber>?</sApplyNumber><sChargeJson>?</sChargeJson></request>'
        responsexml = self.__CallService('PROC_PA_CENTER_CHARGE_DETAIL', requestxml)
        responsejson = json.dumps(self.__ParseSynchronizeChargingDetailXml(responsexml), ensure_ascii=False)
        logging.info('同步返回,HisId:%s,ApplyNumber:%s,JsonStr:%s' % (sHisId, sApplyNumber, responsejson))
        return responsejson

class HimapWSProvider(ServiceBase):
    @rpc(Unicode, Unicode, Unicode, Unicode, _returns=Unicode)
    def ChargingDetailInterChange(self, sHisId, sApplyNumber, sChargeJson, sApplyJson):
        himapwsmanager = HimapWSManager()
        return himapwsmanager.CallChargingDetailInterChange(sHisId, sApplyNumber, sChargeJson, sApplyJson)

    @rpc(Unicode, Unicode, Unicode, _returns=Unicode)
    def SynchronizeChargingDetail(self, sHisId, sApplyNumber, sChargeJson):
        himapwsmanager = HimapWSManager()
        return himapwsmanager.CallSynchronizeChargingDetail(sHisId, sApplyNumber, sChargeJson)

application = Application([HimapWSProvider], 'http://schemas.xmlsoap.org/soap/envelope', in_protocol=Soap11(validator='lxml'), out_protocol=Soap11())
wsgi_application = WsgiApplication(application)

if __name__ == '__main__':
    # result = ParseChargingDetailInterChangeXml(xml_charge)
    # print(result)
    logging.basicConfig(level=logging.INFO,format='%(asctime)s %(levelname)s:%(message)s')
    logging.getLogger('spyne.protocol.xml').setLevel(logging.INFO)
    hostname = socket.gethostname()
    ip = socket.gethostbyname(hostname)
    port = 8081
    logging.info('wsdl service url: http://%s:%d/HimapWSProvider?wsdl' % (ip, port))
    server = make_server(ip, port, wsgi_application)
    logging.info("service startup successfully")
    server.serve_forever()
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门