系统开发

介绍使用Java代码来开发基于Biz-SIP业务中台的系统。

一、概述

Biz-SIP中间件在开发应用时,主要涉及配置文件编写和Java代码开发,系统架构如下所示:

image.png

上图中蓝底黑字所呈现的部分,就是需要Java代码开发的,涉及适配(source)层、应用(app)层和领域(sink)层。蓝色虚箭头线所涉及的接口,就是不同层次之间需要互相依赖的接口定义。

二、source层

Source服务位于适配层,主要功能是接入外部发起的服务,主要功能有:

  • 通讯适配:和外部接入通讯方式的适配,接收外部发起的请求消息,并把处理过后的响应消息送回。
  • 消息转换:把外部发来的请求消息,转换成Biz-SIP平台内部消息格式;并把平台的响应消息,打包成外部的消息格式。
  • app服务调用:往后调用app层服务。
  • 渠道接入的安全处理,包括报文加解密、加签解签;
  • 渠道接入的报文校验;
  • 渠道接入的交易处理,包括流水记录、风控处理、终端管理等;

1. pom.xml配置

在pom.xml文件中添加source-spring-boot-starter依赖,以及应用聚合层app模块和领域层sink模块相关联的client接口相关包:

        <dependency>
            <groupId>com.bizmda.bizsip</groupId>
            <artifactId>source-spring-boot-starter</artifactId>
        </dependency>

2. application.yml配置

Source模块的application.yml中的主要相关参数:

配置项 配置说明
bizsip.config-path Biz-SIP中间件配置文件根目录位置
bizsip.integrator-url App服务聚合器的接口url地址

例子:

server:
  port: 8080

spring:
  application:
    name: bizsip-sample-source

  cloud:
    nacos:
      discovery:
        server-addr: bizsip-nacos:8848

bizsip:
  config-path: /var/bizsip/config
  integrator-url: http://bizsip-integrator/api

logging:
  level:
    com.bizmda.bizsip: debug

3. Source模块开发

Source模块主要负责通讯接入的适配、消息解包、消息打包、调用App服务,以及其它个性化处理。

3.1 通讯接入的适配

针对不同的通讯接入方式,source层会采用不同的接入方式,主要有:

  • Controller:接入http的请求,包括RESTful报文请求接入;
  • Netty:接入TCP协议的请求;
  • RabbitMQ:实现异步转换成同步的接入;
  • 等等…

3.2 source模块打包和解包

平台提供JSON、XML、定长、有分隔符、ISO8583的格式转换器(converter),如果采用这些平台提供的converter,就需要约定Source ID。
首先,应在在source.yml中配置相应的converter:

- id: xml-source
  converter:
    type: simple-xml

然后,就可以在代码中通过Converter.getSourceConverter()绑定converter,进行相应的解包和打包操作:

@RestController
@RequestMapping("/personal")
public class XmlController {
    ......
    // 获取source层消息转换句柄
    private Converter converter = Converter.getSourceConverter("xml-source");

    @PostMapping(value = "/getCustomerAndAccountList", consumes = "application/xml", produces = "application/xml")
    public String getCustomerAndAccountList(@RequestBody String inMessage) throws BizException {
        // 消息解包操作
        JSONObject jsonObject = this.converter.unpack(inMessage.getBytes());
        String customerId = (String)jsonObject.get("customerId");
        CustomerAndAccountList customerAndAccountList = this.personalAppInterface.getCustomerAndAccountList(customerId);
        jsonObject = JSONUtil.parseObj(customerAndAccountList);
        // 消息打包并返回
        return new String(this.converter.pack(jsonObject));
    }
  	...
 }

如果采用Java编程实现报文打解包,就可以不考虑Source ID的约定。

3.3 交易处理

对于简单的交易处理,可以考虑用Spring Service来做交易处理。
在编码时注意,调用app层聚合服务可能会导致时延较长,应避免在有数据库操作的Spring Service类中调用app层聚合服务,并在有数据库操作的Spring Service类上尽量加@Transactional,以避免长时间锁表。

3.4 调用app层服务

统一采用“SourceClientFactory.getAppServiceClient(Class tClass,String appServiceId)”来调用app层聚合服务(tClass必须是接口类),二种调用方式:

  • 接口约定调用:由app层定义接口调用的Interface接口类,由渠道接入模块所引用。如下例所示:
@RestController
@RequestMapping("/personal")
public class PersonalController  {
    private PersonalAppInterface personalAppInterface = SourceClientFactory
            .getAppServiceClient(PersonalAppInterface.class,"app/personal");
	...

    @GetMapping(value ="/getCustomerAndAccountList")
    public CustomerAndAccountList getCustomerAndAccountList(String customerId) {
        return this.personalAppInterface.getCustomerAndAccountList(customerId);
    }

    @GetMapping(value ="/getAccountListByCustomerId")
    public List<Account> getAccountListByCustomerId(String customerId) {
        return this.personalAppInterface.getAccountListByCustomerId(customerId);
    }

    @GetMapping(value ="/getCustomer")
    public Customer getCustomer(String customerId) {
        return this.personalAppInterface.getCustomer(customerId);
    }
	...
}

  • 非接口约定调用:采用平台通用JSONObject类型,Interface接口类统一采用BizMessageInterface接口类,并统一用call()来调用app层聚合服务。如下例所示:
@RestController
@RequestMapping("/personal")
public class PersonalController  {
    private BizMessageInterface payment1SinkInterface = SourceClientFactory
            .getAppServiceClient(BizMessageInterface.class,"sink/payment1");
    ...
        
    @GetMapping(value ="/send2Payment")
    public BizMessage<JSONObject> send2Payment(String message) throws BizException {
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("message",message);
        return this.payment1SinkInterface.call(jsonObject);
    }
}

4. 可复用Source模块代码框架

Source模块负责外部应用的接入,外部应用的通讯接入模块,理论上是可复用的,可复用Source模块代码框架,是把通用的Source通讯适配应用和个性化的Source服务分开,从而达到通讯接入模块可复用的目标,如下所示:

image.png

可复用Source模块框架中包括Source通讯适配应用和Source服务二块:

  • Source通讯适配应用:主要负责和外部应用的通讯连接,根据不同的通讯方式可以选择不同的通讯适配应用来对接,通讯适配应用后续会不断扩展,可以选择已有的Source通讯适配应用,也可以在此框架上,开发自己的Source通讯适配应用。
  • Source服务:是被Source通讯适配应用调用的Spring容器类,一般是实现SourceServiceInterface接口的doService()方法,在此方法中完成消息解包、消息打包、App服务调用等步骤。

SourceServiceInterface接口如下:

/**
 * Source服务接口
 */
public interface SourceServiceInterface {
    /**
     *
     * @param data 传入Source服务的数据
     * @return Source服务返回数据
     * @throws BizException
     */
    Object doService(Object data) throws BizException;
}

在可复用的Source模块中,可以通过@Autowired进行注入引用,从而实现通用的Source通讯适配应用和个性化的Source服务代码分离的目标:

@RestController
public class RestSourceController {
    @Autowired
    private SourceBeanInterface restSourceService;

5. 典型的可复用Source模块介绍

在source模块下,有Netty和RESTful接入的source模块,可以参考:

├── source      Source服务接入子模块
│   ├── netty-source		Netty接入Source子模块
│   ├── rest-source			Restful接入Source

3.1 rest-source模块(基于JSON格式POST请求的RESTful服务)

rest-source模块是提交JSON格式的POST请求,模块获取POST的JSON报文和HTTP请求头的信息,提交给Source服务进行处理。
doService()接口描述:

  • 参数:RestSourceDTO对象
public class RestSourceDTO {
    // HTTP请求头Map
    private Map<String,String> headerMap;
    // 上送的JSON报文
    private JSONObject jsonObjectData;
}
  • 返回:JSONObject

3.2 netty-source模块(基于Netty的同步短连接)

netty-source模块是基于Netty的TCP服务端接入,涉及application.yml配置:

netty:
  port: 10002

netty.port为TCP服务端的侦听端口。
doService()接口描述:

  • 参数:byte[]
  • 返回:byte[]

6. OpenAPI接口的开发建议

OpenAPI接口模块主要功能:

  • 敏感数据加解密及报文加签验签;
  • RESTful接口封装;
  • OpenAPI接口文档的生成;
  • Sandbox;
  • 聚合服务的调用

OpenAPI接口模块如下例:

@RestController
@RequestMapping("/personal")
public class OpenapiController {
    private PersonalAppInterface personalAppInterface = SourceClientFactory
            .getBizServiceClient(PersonalAppInterface.class,"app/personal");
    private BizMessageInterface payment1SinkInterface = SourceClientFactory
            .getBizServiceClient(BizMessageInterface.class,"sink/payment1");

    @GetMapping(value ="/getCustomerAndAccountList")
    public CustomerAndAccountList getCustomerAndAccountList(String customerId) {
        return this.personalAppInterface.getCustomerAndAccountList(customerId);
    }

    @GetMapping(value ="/getAccountListByCustomerId")
    public List<Account> getAccountListByCustomerId(String customerId) {
        return this.personalAppInterface.getAccountListByCustomerId(customerId);
    }

    @GetMapping(value ="/getCustomer")
    public Customer getCustomer(String customerId) {
        return this.personalAppInterface.getCustomer(customerId);
    }

    @GetMapping(value ="/getCustomerAndSaf2Payment2")
    public Customer getCustomerAndSaf2Payment2(String tranCode, String customerId) throws BizException {
        return this.personalAppInterface.getCustomerAndSaf2Payment2(tranCode,customerId);
    }

    @GetMapping(value ="/send2Payment1")
    public BizMessage<JSONObject> send2Payment1(String message) throws BizException {
        return this.personalAppInterface.send2Payment1(message);
    }

    @GetMapping(value ="/send2Payment")
    public BizMessage<JSONObject> send2Payment(String message) throws BizException {
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("message",message);
        return this.payment1SinkInterface.call(jsonObject);
    }
}

OpenAPI接口模块可以根据接口要求开发controller类,并加上API注释,自动生成Swagger/Knife4j文档。

三、app层

app层服务功能:

  • 领域层服务(Sink服务)的编排
  • 领域层服务(Sink服务)透传到适配层服务(Source服务和OpenAPI)
  • 存储转发服务的封装
  • 补偿交易的封装

1. pom.xml配置

在pom.xml文件中添加app-spring-boot-starter依赖(1.0.0.Beta8版本之前是依赖integrator-spring-boot-starter),以及领域层sink模块相关联的client接口相关包:

        <dependency>
            <groupId>com.bizmda.bizsip</groupId>
            <artifactId>app-spring-boot-starter</artifactId>
        </dependency>

2. application.yml配置

app模块中的application.yml中的主要相关参数:

配置项 配置说明
bizsip.config-path Biz-SIP中间件配置文件根目录位置
bizsip.rabbitmq-log success:会把交易日志(成功)发送给rabbitMQ
suspend:会把交易日志(成功、挂起)发送给rabbitMQ
fail:会把交易日志(成功、挂起、失败)发送给rabbitMQ
交易日志的rabbitMQ相关发送参数:
- exchange:exchange.direct.bizsip.log
- RoutingKey:queue.log.routing.key
server.port App服务整合器的微服务端口,建议用8888端口,避免和其它端口相冲突
spring.cloud.nacos.discovery.server-addr Nacos服务端口
spring.datasource.* 数据库连接配置(用于服务脚本中db对象)
spring.redis.* redis连接配置(用于服务脚本中redis对象)
spring.rabbitmq.* RabbitMQ配置(用于事务管理器)

例子:

server:
  port: 8888

spring:
  application:
    name: bizsip-integrator

  cloud:
    nacos:
      discovery:
        server-addr: bizsip-nacos:8848
#  以下配置在Istio部署中打开,以不采用NACOS注册中心,而采用etcd注册机制
#  cloud:
#    service-registry:
#      auto-registration:
#        enabled: false  #禁用注册中心服务

  datasource:
    url: jdbc:mysql://bizsip-mysql/sip
    username: root
    password: bizsip123456
    driver-class-name: com.mysql.jdbc.Driver

  redis:
    redisson:
      enable: true
    host: bizsip-redis
    port: 6379
    timeout: 6000
    database: 0
    lettuce:
      pool:
        max-active: 10 # 连接池最大连接数(使用负值表示没有限制),如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)
        max-idle: 8   # 连接池中的最大空闲连接 ,默认值也是8
        max-wait: 100 # # 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException
        min-idle: 2    # 连接池中的最小空闲连接 ,默认值也是0
      shutdown-timeout: 100ms

  rabbitmq:
    virtual-host: /
    host: bizsip-rabbitmq
    port: 5672
    username: springcloud
    password: springcloud
    listener:
      simple:
        concurrency: 5
        max-concurrency: 15
        prefetch: 10

bizsip:
  config-path: /var/bizsip/config
	rabbitmq-log: fail
  
logging:
  level:
    com.bizmda.bizsip: debug

3. 二种app服务类型

聚合应用服务和延迟服务,有二种服务类型:bean-service和app-bean-service。

3.1 基于自定义接口类的bean-service

bean-service:基于Java接口类调用的服务类型,绑定的服务类是基于预先约定好的Java接口。
bean-service在App层服务配置文件(app.yml)中,如下例所示:

app.yml:
- app-service-id: app/sample-bean-service
  type: bean-service
  class-name: com.sample.app.service.SampleBeanService


具体的bean-service代码,要求是一个Spring Service容器类,实现一个约定的App层服务接口,如下例所示:

@Service
public class SampleBeanService implements SampleBeanServiceInterface {
    private BizMessageInterface sampleSinkBeankSinkInterface = AppClientFactory
            .getSinkClient(BizMessageInterface.class,"sample-sink-bean-sink");
    private HelloInterface helloInterface = AppClientFactory
            .getSinkClient(HelloInterface.class,"sample-bean-sink");
    @Override
    public String callSampleSinkBeanSink(String message) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("message",message);
        BizMessage<JSONObject> bizMessage;
        try {
            bizMessage = sampleSinkBeankSinkInterface.call(jsonObject);
        } catch (BizException e) {
            e.printStackTrace();
            return null;
        }
        return (String)bizMessage.getData().get("message");
    }

    @Override
    public String callSampleBeanSink(String message) {
        return this.helloInterface.hello(message);
    }
}

3.2 基于平台内部JSON消息接口的app-bean-service

app-bean-service:基于平台标准JSON接口调用的服务类型,绑定的服务类是继承AppBeanInterface接口。
app-bean-service在App层服务配置文件(app.yml)中,如下例所示:

app.yml:
- app-service-id: app/sample-app-bean-service
  type: app-bean-service
  class-name: com.sample.app.service.SampleAppBeanService


具体的app-bean-service代码,要求是一个Spring Service容器类,实现AppBeanInterface接口,如下例所示:

@Service
public class SampleAppBeanService implements AppBeanInterface {
    private HelloInterface helloInterface = AppClientFactory
            .getSinkClient(HelloInterface.class,"sample-bean-sink");

    @Override
    public JSONObject process(JSONObject jsonObject) throws BizException {
        String message = (String)jsonObject.get("message");
        jsonObject.set("message","sample-app-bean-service: Hello,"+message+";"
                + this.helloInterface.hello(message));
        return jsonObject;
    }
}

4. 调用Sink服务

统一采用“AppClientFactory.getSinkClient(Class tClass,String sinkId)”来调用领域层Sink服务(tClass必须是接口类),有二种调用方式:

  • 自定义接口调用:由app层定义接口调用的Interface接口类,由渠道接入模块所引用。如下例所示:
@Service
public class PersonalAppService implements PersonalAppInterface {
    private AccountSinkInterface accountSinkInterface = AppClientFactory
            .getSinkClient(AccountSinkInterface.class,"account-sink");
    private CustomerSinkInterface customerSinkInterface = AppClientFactory
            .getSinkClient(CustomerSinkInterface.class,"customer-sink");
	...
        
    @Override
    public CustomerAndAccountList getCustomerAndAccountList(String customerId) {
        Customer customer = this.customerSinkInterface.getCustomer(customerId);
        List<Account> accountList = this.accountSinkInterface.getAccountListByCustomerId(customerId);
        CustomerAndAccountList customerAndAccountList = new CustomerAndAccountList();
        customerAndAccountList.setCustomer(customer);
        customerAndAccountList.setAccountList(accountList);
        return customerAndAccountList;
    }
	...
}
  • 平台标准接口调用:采用平台通用JSONObject类型,Interface接口类统一采用BizMessageInterface接口类,并统一用call()来调用app层聚合服务。如下例所示:
@Service
public class PersonalAppService implements PersonalAppInterface {
	...
    private BizMessageInterface payment1SinkInterface = AppClientFactory
            .getSinkClient(BizMessageInterface.class,"payment1-sink");
    private BizMessageInterface payment2SinkInterface = AppClientFactory
            .getSinkClient(BizMessageInterface.class,"payment2-sink");
	...
    @Override
    public BizMessage<JSONObject> send2Payment1(Object message) throws BizException {
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("message",message);
        return this.payment1SinkInterface.call(jsonObject);
    }

    @Override
    public BizMessage send2Payment2(String tranMode, String tranCode, Object message) throws BizException {
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("tranCode",tranCode);
        jsonObject.set("tranMode",tranMode);
        jsonObject.set("message",message);
        return this.payment2SinkInterface.call(jsonObject);
    }
    ...
}

5. 调用延迟App服务

延迟服务也是位于app层的聚合服务,延迟服务只能由app层的聚合服务来进行嵌套调用,不能从适配层直接调用延迟服务。
App调用延迟服务,统一采用“AppClientFactory.getDelayAppServiceClient(Class tClass, String bizServiceId, int… delayMilliseconds)”来调用(tClass必须是接口类),同样有自定义接口调用和平台标准接口调用二种方式,其中平台标准接口调用是采用BizMessageInterface接口类,并统一用call()来调用app层聚合服务,如下例所示:

@Service
public class PersonalAppService implements PersonalAppInterface {
    private AccountSinkInterface accountSinkInterface = AppClientFactory
            .getSinkClient(AccountSinkInterface.class,"account-sink");
    private CustomerSinkInterface customerSinkInterface = AppClientFactory
            .getSinkClient(CustomerSinkInterface.class,"customer-sink");
    private BizMessageInterface payment1SinkInterface = AppClientFactory
            .getSinkClient(BizMessageInterface.class,"payment1-sink");
    private BizMessageInterface payment2SinkInterface = AppClientFactory
            .getSinkClient(BizMessageInterface.class,"payment2-sink");
    private PersonalAppInterface personalAppDelayInterface = AppClientFactory
            .getDelayAppServiceClient(PersonalAppInterface.class,"app/personal",
                    0,1000,2000,4000,8000,16000,32000);
	...
    @Override
    public void payoutForward(String tranMode,String accountId, long amount) throws BizException {
        log.info("account出金:{},{}",accountId,amount);
        this.accountSinkInterface.payout(accountId,amount);
        JSONObject jsonObject = new JSONObject();
        jsonObject.set("tranCode","pay");
        jsonObject.set("tranMode",tranMode);
        jsonObject.set("accountId",accountId);
        jsonObject.set("tranAmount",amount);
        BizMessage<JSONObject> bizMessage = null;
        try {
            log.info("payment缴费...");
            bizMessage = this.payment2SinkInterface.call(jsonObject);
        } catch (BizException e) {
            if (e.isTimeOutException()) {
                log.info("payment交易超时,开始payout补偿...");
                this.personalAppDelayInterface.payoutForwardCompensate(jsonObject);
                return;
            }
            else {
                throw e;
            }
        }
        log.info("payment缴费成功!");
        log.info("payout成功!");
    }
}

6. App服务入口报文校验

在app层中,有对app层服务的校验机制,包括域级校验和服务级校验,可以考虑统一在app层进行报文检验配置:

config/check-rule/openapi/sample1.yml

field-check-rules:
  - field: email
    rule: isEmail
    message: '不是邮箱地址:{}'
  - field: sex
    rule: notEmpty
    message: '不能为空'
  - field: mobile
    rule: isMatchRegex
    args:
      - '^[1][3,4,5,6,7,8,9][0-9]{9}$'
    message: '不是手机号{}'
field-check-mode: one
service-check-rules:
  - script: if(data.sex == '1')
            {return '性别不符合!';}
    message: '额度超限'
service-check-mode: one

但如果是仅针对于该特定渠道的报文校验,就只能考虑在渠道接入模块做个性化处理。

四、sink层

sink层主要包括二类模块:第三方接入模块和交易处理模块:

  • 第三方系统接入模块,会涉及第三方应用的调用,包括同步调用和异步调用。
  • 交易处理模块就是一种特殊的Sink模块,是通过connector接收RestController发来的服务请求,并进行一系列内部交易处理后返回,不涉及第三方应用的调用。

1. pom.xml配置

在pom.xml文件中添加sink-spring-boot-starter依赖:

        <dependency>
            <groupId>com.bizmda.bizsip</groupId>
            <artifactId>sink-spring-boot-starter</artifactId>
        </dependency>

2. application.yml配置

领域层Sink应用的application.yml中的主要相关参数:

配置项 配置说明
bizsip.config-path Biz-SIP中间件配置文件根目录位置
bizsip.sink-id(可选) sink-id可以配置多个以“,”号分隔的id,应用会自动加载根据sink.yml中对应sink id的sink服务(包括rest同步sink服务调用、rabbitmq异步sink服务调用)
bizsip.rabbitmq-log success:会把交易日志(成功)发送给rabbitMQ
suspend:会把交易日志(成功、挂起)发送给rabbitMQ
fail:会把交易日志(成功、挂起、失败)发送给rabbitMQ
交易日志的rabbitMQ相关发送参数:
- exchange:exchange.direct.bizsip.log
- RoutingKey:queue.log.routing.key

如下例:

server:
  port: 8001
spring:
  application:
    name: customer-sink

  cloud:
    nacos:
      discovery:
        server-addr: bizsip-nacos:8848

  datasource:
    url: jdbc:mysql://bizsip-mysql/xbank?autoReconnect=true
    username: root
    password: bizsip123456
    driver-class-name: com.mysql.cj.jdbc.Driver

bizsip:
  config-path: /var/bizsip/config
  sink-id: sink1,sink2,sink3,sink4,sink5,sink6,sink7,sink9,sink10,sink11,sink12,sink13,sink14,netty,sink15,sink22,sink23,sink24,sink25
  rabbitmq-log: success

logging:
  level:
    com.bizmda.bizsip: debug
    com.xbank: trace

3. Sink服务调用方式

在sink.yml中,每个sink都需要定义type属性,约定针对此sink服务的调用方式,type属性包括:

  • rest:通过RESTful同步调用Sink服务;
  • rabbitmq:通过RabbitMQ异步调用Sink服务。

3.1 rest(同步调用Sink服务)

Sink模块是通过RESTful接口来接收app层发来的同步服务请求,主要有二种方式:

  • 一是可以直接在application.yml中配置bizsip.sink-id属性,在application启动时扫描sink.yml中配置的sink,对于type属性为“rest”的sink,会根据sink.yml中对应sink的url属性自动接入path点。

application.yml中应配置bizsip.sink-id属性:

bizsip:
  sink-id: sink1,sink2,sink3,sink4,sink5,sink6
  • 二是可以直接写RestController,以获得更灵活的定制:
@RestController
public class Sink2Controller {
    private Converter converter = Converter.getSinkConverter("sink2");
    private Connector connector = Connector.getSinkConnector("sink2");

    @PostMapping(value = "/sink2", consumes = "application/json", produces = "application/json")
    public BizMessage<JSONObject> doService(@RequestBody BizMessage<JSONObject> inMessage, HttpServletResponse response) {
        log.debug("inMessage:{}", inMessage);
        try {
            byte[] packedMessage = this.converter.pack(inMessage.getData());
            byte[] returnMessage = this.connector.process(packedMessage);
            JSONObject jsonObject = this.converter.unpack(returnMessage);
            return BizMessage.buildSuccessMessage(inMessage,jsonObject);
        } catch (BizException e) {
            log.error("服务端适配器执行出错",e);
            return BizMessage.buildFailMessage(inMessage,e);
        }
    }
}

从上面的例子可以看出,在开发的RestController中,可以灵活地进行消息解包、调用配置的connector,最后消息打包返回。

3.2 rabbitmq(异步调用Sink服务)

type属性为rabbitmq的Sink模块,是通过RabbitMQ消息队列,来接收app层发来的异步服务请求,主要有二种方式:

  • 一是可以直接在application.yml中配置bizsip.sink-id属性,在application启动时扫描sink.yml中配置的sink,对于rabbbitmq型的sink,会根据sink.yml中对应sink的exchange、routing-key、queue来约定Rabbitmq消息传递的exchange、routing key,以及绑定的接收队列(exchange缺省为exchange.dircect.bizsip.sink,routing key缺省为对应的sink id,绑定queue缺省为queue.bizsip.sink.{sink id})。
  • 二是可以自己开发SinkRabbitmqListener,以获得更灵活的定制,具体写法可以参考SinkRabbitmqListener源码:
@Service
@ConditionalOnProperty(name = "bizsip.rabbitmq.queue",matchIfMissing = false)
public class SinkRabbitmqListener {
    @Value("${bizsip.sink-id}")
    private String sinkId;
    @Value("${bizsip.rabbitmq.queue}")
    private String queue;
    @Value("${bizsip.rabbitmq.exchange}")
    private String exchange;
    @Value("${bizsip.rabbitmq.routing-key}")
    private String routingKey;

    @Value("${bizsip.rabbitmq-log:false}")
    private boolean rabbitmqLog;

    private Converter converter = null;
    private Connector connector = null;

    @Autowired
    private RabbitTemplate rabbitTemplate;
    private Jackson2JsonMessageConverter jackson2JsonMessageConverter =new Jackson2JsonMessageConverter();

    @PostConstruct
    public void init() {
        if (this.exchange == null) {
            log.error("配置文件中bizsip.rabbitmq.exchange没有配置,SinkRabbitmqListener初始化失败!");
        }
        if (this.queue == null) {
            log.error("配置文件中bizsip.rabbitmq.queue没有配置,SinkRabbitmqListener初始化失败!");
        }
        if (this.routingKey == null) {
            log.error("配置文件中bizsip.rabbitmq.routing-key没有配置,SinkRabbitmqListener初始化失败!");
        }
        if (this.sinkId == null) {
            log.error("配置文件中bizsip.sink-id没有配置,SinkRabbitmqListener初始化失败!");
            return;
        }
        this.converter = Converter.getSinkConverter(this.sinkId);
        this.connector = Connector.getSinkConnector(this.sinkId);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${bizsip.rabbitmq.queue}", durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = "${bizsip.rabbitmq.exchange}", type = ExchangeTypes.DIRECT),
            key = "${bizsip.rabbitmq.routing-key}"))
    public void doService(Message message) {
        BizMessage<JSONObject> inMessage;
        inMessage = (BizMessage) jackson2JsonMessageConverter.fromMessage(message);
        if (!(inMessage.getData() instanceof JSONObject)) {
            inMessage.setData(JSONUtil.parseObj(inMessage.getData()));
        }
        JSONObject jsonObject = inMessage.getData();
        try {
            jsonObject = this.process(jsonObject);
            this.sendSuccessLog(inMessage,BizMessage.buildSuccessMessage(inMessage,jsonObject));
        } catch (BizException e) {
            this.sendFailLog(inMessage,BizMessage.buildFailMessage(inMessage,e));
        }
    }

    ......
}

4. Sink服务处理方式

sink.yml中可以约定每个sink的服务处理方式,这是通过sink的processor属性来设置的,processor包括以下属性值:

  • default:采用缺省的、无代码开发的的处理方式;
  • sink-bean:通过编写平台标准接口的Spring容器类(实现SinkBeanInterface接口)来进行处理;
  • bean:通过编写实现自定义接口的Spring容器类来进行处理。

4.1 default

在sink.yml中sink的processor设置为default时,sink服务是采用默认的缺省Sink服务流程来处理的,无需任何代码开发,处理流程如下:

4.2 sink-bean

在sink.yml中sink的processor设置sink-bean时,sink服务是会调用实现sink-bean接口(SinkBeanInterface接口)的sink-bean服务类(要求为Spring容器类),具体类名是由sink.yml中当前sink的class-name来指定的。
在sink-bean服务类中,可以调用sink.yml中配置的connector和converter,如果sink-bean服务类是继承AbstractSinkService类,当前sink的connector和converter是会自动注入的。
sink-bean服务类如下例所示:

@Service
public class Payment1SinkService extends AbstractSinkService implements SinkBeanInterface {
    @Override
    public JSONObject process(JSONObject jsonObject) throws BizException {
        log.info("传入消息:\n{}", BizUtils.buildJsonLog(jsonObject));
        byte[] inBytes = this.converter.pack(jsonObject);
        log.info("打包后消息:\n{}", BizUtils.buildHexLog(inBytes));
        JSONObject outJsonObject = this.converter.unpack(inBytes);
        log.info("解包后消息:\n{}", BizUtils.buildJsonLog(outJsonObject));
        return outJsonObject;
    }
}

采用sink-bean服务,接口不用提前约定,扩展性强,但缺点也是很明显,调用双方缺乏接口约定的机制。
在交易类型超过5个时,建议采用Command Executor(命令处理程序),分成多个xxxCmdExe类来进行处理,这些类应统一继承AbstractSinkBeanCmdExe类,并统一在execute()中实现业务逻辑处理。

@Service
public class Payment2SinkService implements SinkBeanInterface {
    @Autowired
    private TimeoutCmdExe timeoutCmdExe;
    @Autowired
    private TimeoutAndFailCmdExe timeoutAndFailCmdExe;
    @Autowired
    private TimeoutAndSuccessCmdExe timeoutAndSuccessCmdExe;
    @Autowired
    private SuccessCmdExe successCmdExe;

    @Override
    public JSONObject process(JSONObject jsonObject) throws BizException {
        log.info("传入消息:\n{}", jsonObject.toString());
        AbstractSinkBeanCmdExe sinkBeanCmdExe;
        String tranMode = (String)jsonObject.get("tranMode");
        switch (tranMode) {
            case "timeout":
                // 收到交易后,永远返回超时
                return timeoutCmdExe.execute(jsonObject);
            case "3timeout-fail":
                // 收到交易后,前3次返回超时,第4次返回失败码
                return timeoutAndFailCmdExe.execute(jsonObject);
            case "3timeout-success":
                // 收到交易后,前3次返回超时,第4次成功返回原报文
                return timeoutAndSuccessCmdExe.execute(jsonObject);
            default:
                //其它情况,成功返回原报文
                return successCmdExe.execute(jsonObject);
        }
    }
}

Command Executor(命令处理程序)如下例:
@Service
public class SuccessCmdExe extends AbstractSinkBeanCmdExe {
    @Override
    public JSONObject execute(JSONObject jsonObject) throws BizException {
        String tranCode = (String)jsonObject.get("tranCode");
        log.info("交易:{},返回交易成功!",tranCode);
        return jsonObject;
    }
}

4.3 bean

在sink.yml中sink的processor设置为bean时,sink服务是会调用实现基于开发者自己约定接口实现的bean服务类(要求为Spring容器类),具体类名是由sink.yml中当前sink的class-name来指定的。
在bean服务类中,可以调用sink.yml中配置的connector和converter,如果bean服务类是继承AbstractSinkService类,当前sink的connector和converter是会自动注入的。
bean服务代码如下例所示:

@Service
public class SampleBeanService implements HelloInterface {
    @Override
    public String hello(String message) {
        return "sample-bean-sink: Hello," + message;
    }
}

bean服务接口明确清晰,调用方一目了然,但需要调用双方提前约定Interface接口类。
在交易类型超过5个时,建议采用Command Executor(命令处理程序),分成多个xxxCmdExe类来进行处理,这些类应统一继承AbstractBeanCmdExe类。

@Service
public class AccountSinkService implements AccountSinkInterface {
    @Autowired
    private PayoutCmdExe payoutCmdExe;
    @Autowired
    private  PayoutCompensationCmdExe payoutCompensationCmdExe;
    @Autowired
    private GetAccountListByCustomerIdCmdExe getAccountListByCustomerIdCmdExe;

    @Override
    public List<Account> getAccountListByCustomerId(String customerId) {
        return this.getAccountListByCustomerIdCmdExe.getAccountListByCustomerId(customerId);
    }

    @Override
    public Account payout(String accountId, long amount) {
        return this.payoutCmdExe.payout(accountId,amount);
    }

    @Override
    public Account payoutCompensation(String accountId, long amount) {
        return this.payoutCompensationCmdExe.payoutCompensation(accountId,amount);
    }
}

Command Executor(命令处理程序)如下例:
@Service
public class GetAccountListByCustomerIdCmdExe extends AbstractBeanCmdExe {
    @Autowired
    private AccountService accountService;

    public List<Account> getAccountListByCustomerId(String customerId) {
        QueryWrapper<Account> queryWrapper=new QueryWrapper();
        queryWrapper.eq("customer_id",customerId);
        return this.accountService.list(queryWrapper);
    }
}

5. sink模块中的打包和解包

不管采用平台内置的RestControlller,还是开发者自己写的RestController,都会进行报文消息的打包和解包。
下例是开发者自己写的RestController,包含了消息打包和消息解包操作:

@RestController
public class Sink2Controller {
    private Converter converter = Converter.getSinkConverter("sink2");
    private Connector connector = Connector.getSinkConnector("sink2");

    @PostMapping(value = "/sink2", consumes = "application/json", produces = "application/json")
    public BizMessage<JSONObject> doService(@RequestBody BizMessage<JSONObject> inMessage, HttpServletResponse response) {
        log.debug("inMessage:{}", inMessage);
        try {
            // 消息打包
            byte[] packedMessage = this.converter.pack(inMessage.getData());
            // 调用connector处理
            byte[] returnMessage = this.connector.process(packedMessage);
            // 消息解包
            JSONObject jsonObject = this.converter.unpack(returnMessage);
            return BizMessage.buildSuccessMessage(inMessage,jsonObject);
        } catch (BizException e) {
            log.error("服务端适配器执行出错",e);
            return BizMessage.buildFailMessage(inMessage,e);
        }
    }
}

在上例中,converter会根据sink2的消息格式配置,调用接口进行消息打包和解包。当然,应在sink.yml中配置指定sink的converter:

- id: sink2
  type: rest
  url: http://bizsip-sample-sink/sink2
  converter:
    type: simple-json
  connector:
    type: sink-bean
    class-name: com.bizmda.bizsip.sample.sink.controller.ActServer

如果开发者想在connector中进行报文处理,可以配置和实现SinkBeanInterface接口的connector,这样就可以直接把RestController中收到的平台JSON报文,不在RestController中进行消息格式转换处理,直接通过connector的process()传入平台JSON报文进行处理。

五、交易日志监控

1. 交易日志监控配置

当app层在application.yml中设置bizsip.rabbitmq-log属性后,平台会把相关的交易日志发送给rabbitMQ消息中间件,发送队列参数:

  • exchange:exchange.direct.bizsip.log
  • routing key:key.bizsip.log

一般需要在以下二类应用启动时,配置application.yml中的bizsip.rabbitmq-log属性:

  • App层的应用:所有的应用都是通过App层应用来进行服务编排的,需要设置rabbitmq-log交易日志监控级别,来对App层的App服务和App延迟服务进行全生命周期的监控;
  • Sink层包括RabbitMQ异步Sink服务的应用:对于涉及RabbitMQ异步Sink服务的应用,也需要单独实现全生命周期的监控,也是需要设置rabbitmq-log交易日志监控级别的。

交易日志包括成功、失败、挂起三种状态,发送交易日志的时机和相关状态如下表:

交易日志状态 发送交易日志时机 收到后建议处理方式
0-App服务成功 1、app层服务执行成功后;
2、app层延迟服务执行成功后。

一般应进行交易成功的后续处理,但是应考虑到App延迟服务和Sink异步服务的情况,延迟服务是作为主服务的子交易(子交易的
parentTraceId为父交易的traceId),而且子交易的交易日志有可能会先于父交易的日志被接收到,应根据先期到达的子交易,进行父交易的后续处理。
1-App服务失败 1、app层服务执行失败后;
2、app层延迟服务执行失败后(不包括服务出现超时错误,并没有超过最大重试次数的情况);
3、app层延迟服务出现超时错误,但超过最大重试次数;

一般应进行交易失败的后续处理。
2-App服务挂起 app层延迟服务放入等待队列,直到被触发执行完成前,都为挂起状态。 一般不做处理,等待交易成功或失败后再行处理。
3-Sink服务成功 sink异步服务执行成功后。 一般应进行交易成功的后续处理,Sink异步服务的交易日志有可能会先于App服务的交易日志被接收到。
4-Sink服务失败 sink异步服务执行失败后。 一般应进行交易失败的后续处理。

bizsip.rabbitmq-log的设置属性包括:

  • success:发送成功、挂起、失败类型的交易日志
  • suspend:发送挂起、失败类型的交易日志
  • fail:发送失败类型的交易日志
  • 不设置或其它:不发送交易日志

接收到的交易日志,为Map数据类型,约定如下:

KEY键 值类型 值说明
type int 0-成功,1-失败,2-挂起
request BizMessage App服务的最初请求报文
response BizMessage App服务的最终响应报文

2. 交易日志监控应用开发

Biz-SIP平台在配置好后,会把相关的交易日志发送给rabbitMQ消息中间件,开发者可以开发RabbitMQ中间件客户端侦听应用,对收到的交易日志进行处理,相关队列侦听参数:

  • exchange:exchange.direct.bizsip.log
  • routing key:key.bizsip.log

交易日志监控应用例子,如下所示:

@Service
public class AppLogQueueListener {
    public static final String APP_LOG_QUEUE = "queue.bizsip.applog";
    private Jackson2JsonMessageConverter jackson2JsonMessageConverter =new Jackson2JsonMessageConverter();

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = APP_LOG_QUEUE, durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = BizConstant.BIZSIP_LOG_EXCHANGE, type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"),
            key = BizConstant.BIZSIP_LOG_ROUTING_KEY))
    public void process(Message message) {
        Map<String,Object> map = (Map<String,Object>)jackson2JsonMessageConverter.fromMessage(message);
        int type = (int)map.get("type");
        BizMessage<JSONObject> inBizMessage = new BizMessage<>((Map) map.get("request"));
        BizMessage<JSONObject> outBizMessage = new BizMessage<>((Map) map.get("response"));
        log.info("\ntype:{}\nrequest:\n{}\nresponse:\n{}",type,
                BizUtils.buildBizMessageLog(inBizMessage),
                BizUtils.buildBizMessageLog(outBizMessage));
        return;
    }
}

附录-1:命名规则

用途 规范 解释
领域层服务(sink) xxx-sink
应用层服务(service) /sink/xxx 通过sink-service透传到领域层的sink服务
/app/yyy 封装的某一app层聚合服务(延迟服务建议和原服务放在一个聚合服务中)
适配层服务(source) xxx-source 涉及到消息格式转换配置时,才需要配置Source ID

附录-2:源码目录结构

├── api-gateway  开放平台网关子模块
├── common       公共包子模块
├── docker          Docker-Compose部署相关文件
├── helm               Helm部署相关文件
├── app-spring-boot-starter      app层Spring Boot Starter子模块
├── source-spring-boot-starter      Source层Spring Boot Starter子模块
├── sink-spring-boot-starter        Sink层Sprint Boot Starter子模块
├── log-spring-boot-starter    Log日志Sprint Boot Starter子模块
├── redis-spring-boot-starter  Redis Sprint Boot Starter子模块
├── sample          Sample子模块
│   ├── config      Sample配置文件主目录
│   ├── sample-app          app层Sample子模块
│   ├── sample-sink         Sink层Sample子模块
│   └── sample-source       Source层Sample子模块
├── source      Source服务接入子模块
│   ├── netty-source        Netty接入Source子模块
│   └── rest-source         Restful接入Source
└── sql
    └── Biz-SIP.sql     Biz-SIP数据库初始化脚本
上一页
下一页