1.在父pom里导入依赖

   <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.xuxueli</groupId>
                <artifactId>xxl-job-core</artifactId>
                <version>${xxl-job.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

2.在消息服务、系统配置服务里导入相关依赖

    <!-- rabbitMQ -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <!-- nacos 服务注册发现-->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
    <!-- nacos 配置问题-->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-bootstrap</artifactId>
        <version>3.1.3</version>
    </dependency>
    <!-- xxl 配置问题-->
    <dependency>
        <groupId>com.xuxueli</groupId>
        <artifactId>xxl-job-core</artifactId>
        <version>${xxl-job.version}</version>
    </dependency>

    <!-- loadbalancer启动器 新版本的springcloud去掉了ribbon自带的负载均衡器 -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-loadbalancer</artifactId>
        <version>4.1.3</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-openfeign</artifactId>
        <version>4.1.2</version>
    </dependency>

3.在配置文件,数据库连接配置、mq连接配置、nacos连接配置、xxl-job配置

系统配置服务:

application-dev.yml
server:
  port: 7090
spring:
  application:
    name: system-service
feign:
  client:
    config:
      message-service:
        logger-level: basic
        connect-timeout: 5000 # 连接超时时间,默认2s
        read-timeout: 6000 #请求处理超时时间,默认5s
        request-interceptors:
          - com.zlt.system.interceptor.feign.CustomFeignInterceptor # 开启自定义拦截器
xxl:
  addresses: http://xxx.xxx.xxx.xxx:5719/xxl-job-admin
  accessToken: default_token
  appname: systemJob
  port: 5721
  logpath: /usr/local/xxl-job-2.4.1/logs/xxl-job-admin.log
  logretentiondays: 30
db:
  # SqlServer Oracle Mysql DaMeng JinCang PostgreSQL HighGo
  type: Mysql
  host: xxx.xxx.xxx.xxx
  port: 3306
  datasource: utils
  username: zlt
  password: password
mq:
  host: xxx.xxx.xxx.xxx
  port: 5672
  username: admin
  password: password
application.yml
spring:
  profiles:
    active: dev
  servlet:
    multipart:
      max-request-size: 50MB
      max-file-size: 50MB

mybatis-plus:
  mapper-locations: classpath*:/mapper/*Mapper.xml
  #实体扫描,多个package用逗号或者分号分隔
  typeAliasesPackage: com.example.demo.entity
  configuration:
    call-setters-on-nulls: false
logging:
  level:
    com.example.demo.mapper: debug
bootstrap.yml
spring:
  application:
    name: system-service
  cloud:
    nacos:
      discovery:
        server-addr: xxx.xxx.xxx.xxx:8848
        namespace: 5732cade-e127-4e42-bf51-884a1c6c5012
      config:
        server-addr: xxx.xxx.xxx.xxx:8848
        namespace: 5732cade-e127-4e42-bf51-884a1c6c5012
        file-extension: yaml
        shared-configs:
          - data-id: shared-jdbc.yaml
          - data-id: shared-mq.yaml
          - data-id: shared-redis.yaml
          - data-id: shared-xxljob.yaml

消息中心服务:

application-dev.yml
server:
  port: 7080
spring:
  application:
    name: message-service
feign:
  client:
    config:
      message-service:
        logger-level: basic
        connect-timeout: 5000 # 连接超时时间,默认2s
        read-timeout: 6000 #请求处理超时时间,默认5s
        request-interceptors:
          - com.zlt.message.interceptor.feign.CustomFeignInterceptor # 开启自定义拦截器
xxl:
  addresses: http://xxx.xxx.xxx.xxx:5719/xxl-job-admin
  accessToken: default_token
  appname: messageJob
  port: 5720
  logpath: /usr/local/xxl-job-2.4.1/logs/xxl-job-admin.log
  logretentiondays: 30
db:
  # SqlServer Oracle Mysql DaMeng JinCang PostgreSQL HighGo
  type: Mysql
  host: xxx.xxx.xxx.xxx
  port: 3306
  datasource: utils
  username: zlt
  password: password
mq:
  host: xxx.xxx.xxx.xxx
  port: 5672
  username: admin
  password: password
application.yml
spring:
  profiles:
    active: dev
  servlet:
    multipart:
      max-request-size: 50MB
      max-file-size: 50MB

mybatis-plus:
  mapper-locations: classpath*:/mapper/*Mapper.xml
  #实体扫描,多个package用逗号或者分号分隔
  typeAliasesPackage: com.example.demo.entity
  configuration:
    call-setters-on-nulls: false
logging:
  level:
    com.example.demo.mapper: debug
bootstrap.yml
spring:
  application:
    name: message-service
  cloud:
    nacos:
      discovery:
        server-addr: xxx.xxx.xxx.xxx:8848
        namespace: 5732cade-e127-4e42-bf51-884a1c6c5012
      config:
        server-addr: xxx.xxx.xxx.xxx:8848
        namespace: 5732cade-e127-4e42-bf51-884a1c6c5012
        file-extension: yaml
        shared-configs:
          - data-id: shared-jdbc.yaml
          - data-id: shared-mq.yaml
          - data-id: shared-redis.yaml
          - data-id: shared-xxljob.yaml

4.配置feign客户端

系统模块配置消息中心的接口,定时将需要推送的消息,通过消息中心提供的接口进行保存。注意在启动类上增加@EnableFeignClients注解,以开启feign功能

import com.zlt.system.bean.BaseUtil;
import com.zlt.system.dto.MessageDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;

import java.util.List;

@FeignClient(name = "message-service", path = "/message")
public interface MessageFeignClient {

    @PostMapping("/save")
    BaseUtil save(@RequestBody List<MessageDTO> list);

}
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.cloud.openfeign.EnableFeignClients;

@SpringBootApplication
@MapperScan("com.zlt.system.mapper")
//开启feign
@EnableFeignClients
public class SystemApplication extends SpringBootServletInitializer {

	public static void main(String[] args) {
		SpringApplication.run(SystemApplication.class, args);
	}

	@Override
	protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
		return builder.sources(SystemApplication.class);
	}

}

消息服务配置MQ,XXL-JOB实现消息的即时推送

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.cloud.openfeign.EnableFeignClients;

@SpringBootApplication
@MapperScan("com.zlt.message.mapper")
@EnableFeignClients
public class MessageApplication extends SpringBootServletInitializer {

	public static void main(String[] args) {
		SpringApplication.run(MessageApplication.class, args);
	}

	@Override
	protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
		return builder.sources(MessageApplication.class);
	}

}

import com.zlt.message.commons.BaseUtil;
import com.zlt.message.dto.MessageDTO;
import com.zlt.message.service.MessageService;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@RestController
@RequestMapping("message")
public class MessageController {

    @Resource
    private MessageService messageService;

    @GetMapping("/send")
    private String sendMessage(@RequestParam("message") String message){
            System.out.println(message);
            return message;

    };

    @PostMapping("/save")
    private BaseUtil save(@RequestBody List<MessageDTO> list){
        System.out.println("开始存储消息------");
        messageService.saveMessage(list);
        return BaseUtil.ok();
    };


}
    @Override
    public void  saveMessage(List<MessageDTO> list) {
        List<Message> saveList=new ArrayList<>();
        for (MessageDTO messageDTO:list){
            Message message=new Message();
            BeanUtils.copyProperties(messageDTO,message);
            message.setIsSend(0);
            saveList.add(message);
        }

        this.saveBatch(saveList);
    }

5.配置xxl-job任务

系统配置模块,配置定时任务,定时查询生成的数据,将需要发送消息的数据推送给消息中心服务

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.addresses}")
    private String adminAddresses;

    @Value("${xxl.appname}")
    private String appName;

//    @Value("${xxl.job.executor.ip}")
//    private String ip;

    @Value("${xxl.port}")
    private int port;

    @Value("${xxl.accessToken}")
    private String accessToken;

    @Value("${xxl.logpath}")
    private String logPath;

    @Value("${xxl.logretentiondays}")
    private int logRetentionDays;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appName);
//        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

}
@Component
public class SampleXxlJob {
    @Resource
    private PythonLogService pythonLogService;

    @Autowired
    private MessageFeignClient messageFeignClient;

    @XxlJob("saveMessageHandler")
    public void saveTaskHandler() throws Exception {
        System.out.println("XXL-JOB, Hello World.");
        List<MessageDTO> messageList = pythonLogService.getMessageList();

        BaseUtil save = messageFeignClient.save(messageList);

        System.out.println(save);

    }
}

消息中心模块,配置定时任务,使用短轮询,查询推送表的数据。将数据存入mq的队列,等待消费

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.addresses}")
    private String adminAddresses;

    @Value("${xxl.appname}")
    private String appName;

//    @Value("${xxl.job.executor.ip}")
//    private String ip;

    @Value("${xxl.port}")
    private int port;

    @Value("${xxl.accessToken}")
    private String accessToken;

    @Value("${xxl.logpath}")
    private String logPath;

    @Value("${xxl.logretentiondays}")
    private int logRetentionDays;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appName);
//        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

}
import com.xxl.job.core.handler.annotation.XxlJob;
import com.zlt.message.service.MessageService;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Component;


@Component
public class SampleXxlJob {
    @Resource
    private MessageService messageService;


    @XxlJob("sendMessageHandler")
    public void sendMessageHandler() throws Exception {

        messageService.sendMessageHandler();

        System.out.println("XXL-JOB, Hello World.");
    }

}
    @Override
    public void sendMessageHandler() {
        List<Message> messages = messageMapper.selectList(null);
        if (CollectionUtils.isEmpty(messages)){
            return;
        }
        for (Message message:messages){
//存入mq
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, JSONObject.toJSONString(message));
        }
    }

6.配置MQ服务

消息中心服务配置MQ,取出消息,判断需要发送的类型:微信、手机号、钉钉。通过第三方接口,就行推送

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    public static final String QUEUE_NAME = "wxMessage.queue";
    public static final String EXCHANGE_NAME = "wxMessage.exchange";
    public static final String ROUTING_KEY = "wxMessage.routingKey";

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, false);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    public Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
}
import com.zlt.message.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQConsumer {

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

休闲、娱乐、爱生活