1.在父pom里导入依赖
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${xxl-job.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
2.在消息服务、系统配置服务里导入相关依赖
<!-- rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- nacos 服务注册发现-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- nacos 配置问题-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
<version>3.1.3</version>
</dependency>
<!-- xxl 配置问题-->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${xxl-job.version}</version>
</dependency>
<!-- loadbalancer启动器 新版本的springcloud去掉了ribbon自带的负载均衡器 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
<version>4.1.3</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>4.1.2</version>
</dependency>
3.在配置文件,数据库连接配置、mq连接配置、nacos连接配置、xxl-job配置
系统配置服务:
application-dev.yml
server:
port: 7090
spring:
application:
name: system-service
feign:
client:
config:
message-service:
logger-level: basic
connect-timeout: 5000 # 连接超时时间,默认2s
read-timeout: 6000 #请求处理超时时间,默认5s
request-interceptors:
- com.zlt.system.interceptor.feign.CustomFeignInterceptor # 开启自定义拦截器
xxl:
addresses: http://xxx.xxx.xxx.xxx:5719/xxl-job-admin
accessToken: default_token
appname: systemJob
port: 5721
logpath: /usr/local/xxl-job-2.4.1/logs/xxl-job-admin.log
logretentiondays: 30
db:
# SqlServer Oracle Mysql DaMeng JinCang PostgreSQL HighGo
type: Mysql
host: xxx.xxx.xxx.xxx
port: 3306
datasource: utils
username: zlt
password: password
mq:
host: xxx.xxx.xxx.xxx
port: 5672
username: admin
password: password
application.yml
spring:
profiles:
active: dev
servlet:
multipart:
max-request-size: 50MB
max-file-size: 50MB
mybatis-plus:
mapper-locations: classpath*:/mapper/*Mapper.xml
#实体扫描,多个package用逗号或者分号分隔
typeAliasesPackage: com.example.demo.entity
configuration:
call-setters-on-nulls: false
logging:
level:
com.example.demo.mapper: debug
bootstrap.yml
spring:
application:
name: system-service
cloud:
nacos:
discovery:
server-addr: xxx.xxx.xxx.xxx:8848
namespace: 5732cade-e127-4e42-bf51-884a1c6c5012
config:
server-addr: xxx.xxx.xxx.xxx:8848
namespace: 5732cade-e127-4e42-bf51-884a1c6c5012
file-extension: yaml
shared-configs:
- data-id: shared-jdbc.yaml
- data-id: shared-mq.yaml
- data-id: shared-redis.yaml
- data-id: shared-xxljob.yaml
消息中心服务:
application-dev.yml
server:
port: 7080
spring:
application:
name: message-service
feign:
client:
config:
message-service:
logger-level: basic
connect-timeout: 5000 # 连接超时时间,默认2s
read-timeout: 6000 #请求处理超时时间,默认5s
request-interceptors:
- com.zlt.message.interceptor.feign.CustomFeignInterceptor # 开启自定义拦截器
xxl:
addresses: http://xxx.xxx.xxx.xxx:5719/xxl-job-admin
accessToken: default_token
appname: messageJob
port: 5720
logpath: /usr/local/xxl-job-2.4.1/logs/xxl-job-admin.log
logretentiondays: 30
db:
# SqlServer Oracle Mysql DaMeng JinCang PostgreSQL HighGo
type: Mysql
host: xxx.xxx.xxx.xxx
port: 3306
datasource: utils
username: zlt
password: password
mq:
host: xxx.xxx.xxx.xxx
port: 5672
username: admin
password: password
application.yml
spring:
profiles:
active: dev
servlet:
multipart:
max-request-size: 50MB
max-file-size: 50MB
mybatis-plus:
mapper-locations: classpath*:/mapper/*Mapper.xml
#实体扫描,多个package用逗号或者分号分隔
typeAliasesPackage: com.example.demo.entity
configuration:
call-setters-on-nulls: false
logging:
level:
com.example.demo.mapper: debug
bootstrap.yml
spring:
application:
name: message-service
cloud:
nacos:
discovery:
server-addr: xxx.xxx.xxx.xxx:8848
namespace: 5732cade-e127-4e42-bf51-884a1c6c5012
config:
server-addr: xxx.xxx.xxx.xxx:8848
namespace: 5732cade-e127-4e42-bf51-884a1c6c5012
file-extension: yaml
shared-configs:
- data-id: shared-jdbc.yaml
- data-id: shared-mq.yaml
- data-id: shared-redis.yaml
- data-id: shared-xxljob.yaml
4.配置feign客户端
系统模块配置消息中心的接口,定时将需要推送的消息,通过消息中心提供的接口进行保存。注意在启动类上增加@EnableFeignClients注解,以开启feign功能
import com.zlt.system.bean.BaseUtil;
import com.zlt.system.dto.MessageDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.List;
@FeignClient(name = "message-service", path = "/message")
public interface MessageFeignClient {
@PostMapping("/save")
BaseUtil save(@RequestBody List<MessageDTO> list);
}
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.cloud.openfeign.EnableFeignClients;
@SpringBootApplication
@MapperScan("com.zlt.system.mapper")
//开启feign
@EnableFeignClients
public class SystemApplication extends SpringBootServletInitializer {
public static void main(String[] args) {
SpringApplication.run(SystemApplication.class, args);
}
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
return builder.sources(SystemApplication.class);
}
}
消息服务配置MQ,XXL-JOB实现消息的即时推送
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.cloud.openfeign.EnableFeignClients;
@SpringBootApplication
@MapperScan("com.zlt.message.mapper")
@EnableFeignClients
public class MessageApplication extends SpringBootServletInitializer {
public static void main(String[] args) {
SpringApplication.run(MessageApplication.class, args);
}
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
return builder.sources(MessageApplication.class);
}
}
import com.zlt.message.commons.BaseUtil;
import com.zlt.message.dto.MessageDTO;
import com.zlt.message.service.MessageService;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping("message")
public class MessageController {
@Resource
private MessageService messageService;
@GetMapping("/send")
private String sendMessage(@RequestParam("message") String message){
System.out.println(message);
return message;
};
@PostMapping("/save")
private BaseUtil save(@RequestBody List<MessageDTO> list){
System.out.println("开始存储消息------");
messageService.saveMessage(list);
return BaseUtil.ok();
};
}
@Override
public void saveMessage(List<MessageDTO> list) {
List<Message> saveList=new ArrayList<>();
for (MessageDTO messageDTO:list){
Message message=new Message();
BeanUtils.copyProperties(messageDTO,message);
message.setIsSend(0);
saveList.add(message);
}
this.saveBatch(saveList);
}
5.配置xxl-job任务
系统配置模块,配置定时任务,定时查询生成的数据,将需要发送消息的数据推送给消息中心服务
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.addresses}")
private String adminAddresses;
@Value("${xxl.appname}")
private String appName;
// @Value("${xxl.job.executor.ip}")
// private String ip;
@Value("${xxl.port}")
private int port;
@Value("${xxl.accessToken}")
private String accessToken;
@Value("${xxl.logpath}")
private String logPath;
@Value("${xxl.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appName);
// xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
@Component
public class SampleXxlJob {
@Resource
private PythonLogService pythonLogService;
@Autowired
private MessageFeignClient messageFeignClient;
@XxlJob("saveMessageHandler")
public void saveTaskHandler() throws Exception {
System.out.println("XXL-JOB, Hello World.");
List<MessageDTO> messageList = pythonLogService.getMessageList();
BaseUtil save = messageFeignClient.save(messageList);
System.out.println(save);
}
}
消息中心模块,配置定时任务,使用短轮询,查询推送表的数据。将数据存入mq的队列,等待消费
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.addresses}")
private String adminAddresses;
@Value("${xxl.appname}")
private String appName;
// @Value("${xxl.job.executor.ip}")
// private String ip;
@Value("${xxl.port}")
private int port;
@Value("${xxl.accessToken}")
private String accessToken;
@Value("${xxl.logpath}")
private String logPath;
@Value("${xxl.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appName);
// xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
import com.xxl.job.core.handler.annotation.XxlJob;
import com.zlt.message.service.MessageService;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Component;
@Component
public class SampleXxlJob {
@Resource
private MessageService messageService;
@XxlJob("sendMessageHandler")
public void sendMessageHandler() throws Exception {
messageService.sendMessageHandler();
System.out.println("XXL-JOB, Hello World.");
}
}
@Override
public void sendMessageHandler() {
List<Message> messages = messageMapper.selectList(null);
if (CollectionUtils.isEmpty(messages)){
return;
}
for (Message message:messages){
//存入mq
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, JSONObject.toJSONString(message));
}
}
6.配置MQ服务
消息中心服务配置MQ,取出消息,判断需要发送的类型:微信、手机号、钉钉。通过第三方接口,就行推送
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "wxMessage.queue";
public static final String EXCHANGE_NAME = "wxMessage.exchange";
public static final String ROUTING_KEY = "wxMessage.routingKey";
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME, false);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange(EXCHANGE_NAME);
}
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
}
import com.zlt.message.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class RabbitMQConsumer {
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
Comments | NOTHING