前言 这篇文章希望通过一个简单的demo介绍spring-cloud-alibaba的搭建和使用,让我们对微服务有一个直观的认识;使用的组件有Nacos+Gateway+Sentinel+Seata+RocketMQ,当然本文只介绍最简单的实现,更详细的用法以后再写文章补充。
微服务介绍 微服务是什么 微服务是一种架构风格,将应用程序拆分为小型、独立的功能模块(服务)的开发方式
微服务的特点 每个业务模块实现独立的功能,不限制语言、不限制技术;服务之间通过轻量级的通信机制(如http rest或消息队列)进行交互
微服务的核心思想 解耦应用程序,提升灵活性和维护性
微服务的优点
模块独立解耦
独立部署,快速迭代
灵活的技术栈
高扩展性
高容错性
微服务的缺点
复杂性增加
服务器成本增加
运维成本增加
组件介绍 这么多服务,要如何管理(服务治理、注册中心 nacos,作用:服务发现、注册、过滤、剔除)
这么多服务,它们之间如何通讯(OpenFeign)
这么多服务,外部(前端)如何访问它们(网关 gateway)
这么多服务,如果某一个服务报错了,应该如何解决(sentinel)
这么多服务,如何保证事务的原子性(seata)
系统架构图 我们实现一个简单的微服务,根据订单扣减库存和余额,同时可以给商品点赞
从架构图不难看出,我们将实现一个简单的微服务系统,将实现以下功能:
根据订单扣减库存和账户余额(实现分布式事务)
给商品点赞(实现限流、流量削峰)
依赖版本
JDK8或更高
SpringBoot 2.3.2.RELEASE 、SpringCloud Hoxton.SR9、SpringCloudAlibaba 2.2.6.RELEASE
关于spring-cloud-alibaba中各组件的版本,可以参看:https://github.com/alibaba/spring-cloud-alibaba/wiki/%E7%89%88%E6%9C%AC%E8%AF%B4%E6%98%8E
软件安装 网络 在部署组件之前必须强调网络问题,因为这些组件的服务端我全部使用docker-compose部署(单主机),而docker-compose中容器的网络是默认独立的(默认每个docker-compose.yml文件启动的容器都有自己独立的网络,通过桥接(brigde)的方式和宿主机连接);这就使得我们在容器中不能用localhost或者127.0.0.1来访问其他容器(这样访问的是容器自身);当然我们的容器会暴露接口到宿主机,我们似乎也可以通过访问宿主机ip的方式来访问其他的容器,但是这样在宿主机ip变更时,我们就需要修改多个地方,如果后续部署的服务变得复杂后会很麻烦;
那么容器之间要怎么通信呢?如果我们的宿主机是集群,那么就要上k8s了,但是我们现在是还只是测试,使用的是单主机,所有的应用都在一台电脑上跑,这样我们可以在docker中简单的建立一个网路,在部署容器的时候将这些容器都放在这个网络之下,那么我们就可以使用容器的名称来代替容器的ip,docker会自动帮我们解析;这个方式实践来看是最方便的,比较容易实现
在这次的部署中,我在docker中建立的网络叫‘spring_cloud_alibaba_network’,驱动类型为桥接(bridge),在命令行中输入命令:
1 docker network create --driver bridge spring_cloud_alibaba_network
创建完毕后可以通过命令查看:
创建组件时在每个容器的docker-compose.yml文件中添加:
1 2 3 4 5 6 7 services: networks: - spring_cloud_alibaba_network networks: spring_cloud_alibaba_network: external: true
1. nacos安装 这里使用docker-compose方式安装nacos,docker-compose.yml文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 services: nacos: image: nacos/nacos-server:latest container_name: nacos networks: - spring_cloud_alibaba_network ports: - 8848 :8848 - 9848 :9848 - 9849 :9849 privileged: true environment: MODE: standalone TZ: Asia/Shanghai SPRING_DATASOURCE_PLATFORM: mysql MYSQL_SERVICE_HOST: ip MYSQL_SERVICE_PORT: 3306 MYSQL_SERVICE_USER: root MYSQL_SERVICE_PASSWORD: password MYSQL_SERVICE_DB_NAME: nacos_config MYSQL_SERVICE_DB_PARAM: characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true volumes: - ./logs/:/home/nacos/logs networks: spring_cloud_alibaba_network: external: true
使用命令“docker compose up -d”安装nacos服务,访问“http://localhost:8848/nacos”可以看到nacos服务已经启动
2. seata安装 seata的配置比较复杂,不同版本的变化比较大,我们需要设置的主要是这么几项:
服务发现:是否要将seata服务注册到服务发现中心,新版在application.yml文件的registry模块中设置,旧版在registry.conf文件的registry模块中设置
配置中心:是否要将seata服务的配置文件放到配置中心,新版在application.yml文件的config模块中设置,旧版在registry.conf文件的config模块中设置(如果我们设置将配置放到配置中心,那么一些配置就不用在本地重复配置了)
数据保存:是否要将seata服务的数据保存到数据库,新版在application.yml文件的store模块中设置,旧版在file.conf文件的store模块中设置(如果配置放到配置中心,本地就不需要配置了)
那么这些不同的配置有什么区别呢?如果我们只是使用单体版的seata服务,这三项我们就可以设置为‘file’,以文件的形式保存数据,舍弃服务发现和配置中心;而如果要集群的方式来部署seata,我们就必须使用使用nacos等服务发现和配置中心,数据也必须保存到mysql、redis等数据库;为什么呢?因为每一台机器都在本地保存自己的file,集群的机器有物理、网络的隔离,相互是无法共享本地配置和数据的,这时候就要将数据保存到一个统一、大家都能获取的地方
(1)安装latest版本,使用nacos做服务发现和配置中心,mysql做数据库 这里我们选择将其数据持久化到MySQL,同时使用nacos作为服务发现和配置中心(将数据库连接相关的配置保存到nacos),因此在安装了MySQL和Nacos后我们再安装Seata
a. 获取seata的sql脚本(sql脚本变化不大,我直接贴在这里),创建数据库seata_config,并在其中运行这个sql脚本创建数据表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 CREATE TABLE IF NOT EXISTS `global_table`( `xid` VARCHAR (128 ) NOT NULL , `transaction_id` BIGINT , `status` TINYINT NOT NULL , `application_id` VARCHAR (32 ), `transaction_service_group` VARCHAR (32 ), `transaction_name` VARCHAR (128 ), `timeout` INT , `begin_time` BIGINT , `application_data` VARCHAR (2000 ), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`xid`), KEY `idx_status_gmt_modified` (`status` , `gmt_modified`), KEY `idx_transaction_id` (`transaction_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; CREATE TABLE IF NOT EXISTS `branch_table`( `branch_id` BIGINT NOT NULL , `xid` VARCHAR (128 ) NOT NULL , `transaction_id` BIGINT , `resource_group_id` VARCHAR (32 ), `resource_id` VARCHAR (256 ), `branch_type` VARCHAR (8 ), `status` TINYINT, `client_id` VARCHAR (64 ), `application_data` VARCHAR (2000 ), `gmt_create` DATETIME(6 ), `gmt_modified` DATETIME(6 ), PRIMARY KEY (`branch_id`), KEY `idx_xid` (`xid`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; CREATE TABLE IF NOT EXISTS `lock_table`( `row_key` VARCHAR (128 ) NOT NULL , `xid` VARCHAR (128 ), `transaction_id` BIGINT , `branch_id` BIGINT NOT NULL , `resource_id` VARCHAR (256 ), `table_name` VARCHAR (32 ), `pk` VARCHAR (36 ), `status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking' , `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`row_key`), KEY `idx_status` (`status`), KEY `idx_branch_id` (`branch_id`), KEY `idx_xid` (`xid`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; CREATE TABLE IF NOT EXISTS `distributed_lock`( `lock_key` CHAR (20 ) NOT NULL , `lock_value` VARCHAR (20 ) NOT NULL , `expire` BIGINT , primary key (`lock_key`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting' , ' ' , 0 );INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting' , ' ' , 0 );INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking' , ' ' , 0 );INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck' , ' ' , 0 );
b. 获取seata的配置文件,这有两种形式,可以直接下载,在resources目录中(相对老的版本在conf目录)获取,或者用docker-compose拉取镜像并安装一个容器,从容器中复制出来
1 2 # 复制命令 docker cp 容器id:/seata-server/resources/* ./
在合适的目录下创建docker-compose.yml,并在同一目录下创建resources文件夹,将刚才获取的所有配置文件放入其中,配置文件的结构为:
1 2 3 4 5 6 7 8 seata-server/ ├── classes/ └── libs/ └── logs/ └── resources/ └── application.yml └── application.example.yml └── sessionStore/
c. docker-compose.yml内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 services: seata-server: image: seataio/seata-server:latest container_name: seata-server ports: - "7091:7091" - "8091:8091" environment: - SEATA_IP=192.168.124.7 volumes: - ./resources:/seata-server/resources - ./sessionStore:/seata-server/sessionStore networks: - spring_cloud_alibaba_network networks: spring_cloud_alibaba_network: external: true
这里可以看到我们映射了7091和8091两个端口,我们用的是腾讯云服务器做内网穿透,要记得打开这连个端口
我们将resources和sessionStore映射到docker-compose.yml同级的文件夹下方便管理
d. 我们进入resources文件夹,刚才我们获取的配置文件都在这里,我们主要需要配置的是application.yml文件,可以参考application.example.yml文件配置(这里面有所有情况的配置示例),具体配置的内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 server: port: 7091 spring: application: name: seata-server logging: config: classpath:logback-spring.xml file: path: ${log.home:${user.home}/logs/seata} extend: logstash-appender: destination: 127.0 .0 .1 :4560 kafka-appender: bootstrap-servers: 127.0 .0 .1 :9092 topic: logback_to_logstash console: user: username: admin password: admin seata: security: secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017 tokenValidityInMilliseconds: 1800000 ignore: urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/health,/error config: type: nacos nacos: server-addr: 192.168 .124 .7 :8848 namespace: namespaceId group: DEFAULT_GROUP username: password: data-id: seataServer.yaml registry: type: nacos nacos: application: seata-server server-addr: 192.168 .124 .7 :8848 group: DEFAULT_GROUP namespace: namespaceId cluster: default username: password:
我们可以看到,application.yml中有很多配置项;这里需要说明的是,因为我们使用了nacos配置中心,我们可以将nacos配置中心和服务发现以外的配置都放在nacos中
e. 根据上一步配置的nacos配置中心的内容,在nacos页面的配置中心新建命名空间‘seata-server’,在其中新建配置文件‘seataServer’,Group为‘DEFAULT_GROUP’,格式为‘yaml’,我尽量为这些配置写了注解,内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 seata: store: mode: db db: datasource: druid db-type: mysql driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://192.168.124.7:3306/seata_config?rewriteBatchedStatements=true user: root password: 993972 min-conn: 10 max-conn: 100 global-table: global_table branch-table: branch_table lock-table: lock_table distributed-lock-table: distributed_lock query-limit: 1000 max-wait: 5000 server: recovery: committing-retry-period: 1000 async-committing-retry-period: 1000 rollbacking-retry-period: 1000 timeout-retry-period: 1000 undo: log-save-days: 7 log-delete-period: 86400000 session: branch-async-queue-size: 5000 enable-branch-async-remove: false service-port: 8091 max-commit-retry-timeout: -1 max-rollback-retry-timeout: -1 rollback-failed-unlock-enable: false enable-check-auth: true enable-parallel-request-handle: true retry-dead-threshold: 130000 xaer-nota-retry-timeout: 60000 enableParallelRequestHandle: true
f. 现在可以运行命令“docker compose up -d”启动seata,访问”http://localhost:7091",若能进入seata登录页面证明安装成功
(2)安装1.3.0版本(单体版) a. 这里先说一下这个版本的docker容器内的目录结构
1 2 3 4 5 6 7 8 9 seata-server/ ├── classes/ ├── conf/ ├── registry.conf ├── file.conf └── libs/ └── logs/ └── resources/ └── sessionStore/
我们的配置文件便是‘/seata-server/conf/registry.conf’和‘/seata-server/conf/file.conf’,‘resources’文件夹在这里不用管;我们将registry.conf文件的registry、config模块,file.conf文件的store模块都设置为‘file’;
b. 在合适的地方新建文件夹‘seata-1.3.0’,在其中新建docker-compose.yml文件,内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 services: seata-server: image: seataio/seata-server:1.3.0 container_name: seata-server networks: - spring_cloud_alibaba_network ports: - "8091:8091" environment: - SEATA_CONFIG_NAME=file:/seata-server/conf/registry - SEATA_IP=192.168.124.7 - SEATA_PORT=8091 volumes: - ./sessionStore:/seata-server/sessionStore - ./conf:/seata-server/conf - ./logs:/seata-server/logs networks: spring_cloud_alibaba_network: external: true
c. 在‘seata-1.3.0’文件夹中新建‘conf’文件夹,在其中新建’registry.conf’和‘file.conf’,内容为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 ## registry.conf registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "file" nacos { application = "seata-server" serverAddr = "ip:port" group = "DEFAULT_GROUP" namespace = "namespaceId" cluster = "default" username = "" password = "" } eureka { serviceUrl = "http://localhost:8761/eureka" application = "default" weight = "1" } redis { serverAddr = "localhost:6379" db = 0 password = "" cluster = "default" timeout = 0 } zk { cluster = "default" serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } consul { cluster = "default" serverAddr = "127.0.0.1:8500" } etcd3 { cluster = "default" serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" application = "default" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" cluster = "default" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } } config { # file、nacos 、apollo、zk、consul、etcd3 type = "file" nacos { serverAddr = "ip:port" namespace = "namespaceId" group = "DEFAULT_GROUP" username = "" password = "" } consul { serverAddr = "127.0.0.1:8500" } apollo { appId = "seata-server" apolloMeta = "http://192.168.1.204:8801" namespace = "application" } zk { serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 ## file.conf ## transaction log store, only used in seata-server store { ## store mode: file、db、redis mode = "file" ## file store property file { ## store location dir dir = "sessionStore" # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions maxBranchSessionSize = 16384 # globe session size , if exceeded throws exceptions maxGlobalSessionSize = 512 # file buffer size , if exceeded allocate new buffer fileWriteBufferCacheSize = 16384 # when recover batch read size sessionReloadReadSize = 100 # async, sync flushDiskMode = async } ## database store property db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc. datasource = "druid" ## mysql/oracle/postgresql/h2/oceanbase etc. dbType = "mysql" driverClassName = "com.mysql.cj.jdbc.Driver" url = "jdbc:mysql://ip:port/seata_config" user = "user" password = "password" minConn = 5 maxConn = 30 globalTable = "global_table" branchTable = "branch_table" lockTable = "lock_table" queryLimit = 100 maxWait = 5000 } ## redis store property redis { host = "127.0.0.1" port = "6379" password = "" database = "0" minConn = 1 maxConn = 10 queryLimit = 100 } }
d. 使用命令“docker compose up -d”启动服务,端口号即8091
注:使用seata-1.3.0版本,再使用热部署依赖“spring-boot-devtools”,项目就会报错“fileListener execute error:null”,但是功能可以正常使用
3. sentinel安装 sentinel的安装部署首先要特别注意网络问题,要确保sentinel服务端能和被保护的服务正常通信,如果与被保护的服务不能正常通信,就会导致一系列的问题(规则失效、报错等),所以要确保sentinel服务端与被保护的服务能正常通信
官方在github上提供的是sentinel的源码和jar包,在本地启动测试没什么问题,但因为没有做数据的持久化,每次重启服务之前的配置就会消失;要在生产环境使用并做数据的持久化,需要配置服务发现等一系列组件,可以参看:https://cloud.benym.cn/pages/12a5d6/#%E8%83%8C%E6%99%AF
(1)直接启动jar包 我们可以用命令行直接启动sentinel的jar包
1 java -jar sentinel-dashboard-1.8.0.jar
访问“http://localhost:8080/”,账号密码默认为“sentinel”
(2)用docker-compose启动 在合适的地方创建文件夹‘sentinel’,在其中创建Dockerfile文件,这个文件是用来构建docker镜像的,可以看到我们设置的接口是8718
1 2 3 4 5 FROM openjdk:8 -jreMAINTAINER qiuliCOPY ./sentinel-dashboard-1.8.0.jar /app.jar EXPOSE 8718 ENTRYPOINT ["java" , "-jar" , "app.jar" ]
创建docker-compose.yml文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 services: sentinel: build: context: ./ dockerfile: ./Dockerfile image: sentinel container_name: sentinel restart: no networks: - spring_cloud_alibaba_network ports: - "8718:8718" logging: driver: "json-file" options: max-size: "10m" max-file: "1" volumes: - "./data:/root/data" - "./logs:/root/logs" command: [ "--server.port=8718" , "--logging.file.path=/app-logs" ] networks: spring_cloud_alibaba_network: external: true
运行命令‘docker compose up -d’构建镜像并启动容器,访问‘http://localhost:8718'就可以访问sentinel的首页,默认账号密码是’sentinel‘
4. rocketmq安装 我们同样使用docker-compose部署rocketmq,可以参考:
docker-compose.yml文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 services: namesrv: image: apache/rocketmq:4.4.0 container_name: rmqnamesrv networks: - spring_cloud_alibaba_network ports: - 9876 :9876 command: sh mqnamesrv broker: image: apache/rocketmq:4.4.0 container_name: rmqbroker networks: - spring_cloud_alibaba_network ports: - 10909 :10909 - 10911 :10911 - 10912 :10912 environment: - NAMESRV_ADDR=rmqnamesrv:9876 volumes: - ./conf:/home/rocketmq/rocketmq-4.4.0/conf depends_on: - namesrv command: sh mqbroker -c /home/rocketmq/rocketmq-4.4.0/conf/broker.conf rocketmq-dashboard: image: apacherocketmq/rocketmq-dashboard:latest container_name: rocketmq-dashboard networks: - spring_cloud_alibaba_network ports: - 8082 :8080 environment: - NAMESRV_ADDR=rmqnamesrv:9876 depends_on: - namesrv - broker networks: spring_cloud_alibaba_network: external: true
可以看到我们部署了3个服务,‘rmqnamesrv’、‘rmqbroker’和‘rocketmq-dashboard’;其中,‘rmqnamesrv’是类似‘nacos’的负责服务发现和路由管理的服务,‘rmqbroker’负责消息的存储、投递和管理,‘rocketmq-dashboard’提供可视化的监控和管理功能;
根据docker-compose.yml文件,我们将服务‘rmqbroker’的conf文件夹挂载到宿主机中docker-compose.yml文件所在的文件夹,方便我们对rmqbroker的配置(可以到github下载源码,其中就有conf文件夹及其中的文件)
conf文件夹中的文件
可以看到其中有很多配置模版(2m-2s-xx),rmqbroker的配置文件为broker.conf,其中需要添加配置‘brokerIP1 = 宿主机IP’,这个ip是broker服务对外暴露的 IP 地址,用于对外提供服务,添加后的broker.conf文件为:
1 2 3 4 5 6 7 8 9 10 # 宿主机IP,用于被外界访问 brokerIP1 = 宿主机IP brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH
在配置好这两个文件后运行命令‘docker compose up -d’,即可启动容器;
容器启动后在宿主机访问‘http://localhost:8082
’(我们在docker-compose.yml文件中将宿主机的8082端口映射到容器的8080端口),可以看到服务正确启动
项目实现 1. 新建工程 新建父工程‘spring-cloud-alibaba-study’,并在其中新建子工程‘service-order’(订单服务)、‘service-storage’(库存服务)、‘service-account’(账户服务)
2. 添加依赖 在父工程中添加springboot、springcloud、springcloudalibaba依赖如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 <dependencyManagement> <dependencies> <!--spring boot 2.3.2.RELEASE--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.3.2.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <!--spring cloud Hoxton.SR9--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Hoxton.SR9</version> <type>pom</type> <scope>import</scope> </dependency> <!--spring cloud alibaba 2.2.6.RELEASE--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.2.6.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
3. nacos服务发现 (1)在pom文件中添加nacos服务发现依赖
1 2 3 4 5 6 7 <dependencies> <!-- nacos服务发现 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> </dependencies>
(2)在application.yml 文件中添加nacos服务发现地址
1 2 3 4 5 spring: cloud: nacos: discovery: server-addr: http://nacosip:8848
(3)在启动类上添加服务发现注解
1 2 3 4 5 6 7 @SpringBootApplication @EnableDiscoveryClient public class Application { public static void main (String[] args) { SpringApplication.run(Application.class, args); } }
4. nacos配置中心 (1)在pom文件中添加nacos配置中心依赖
1 2 3 4 5 <!-- nacos配置中心 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency>
(2)登录nacos界面,在‘命名空间’中新建命名空间‘spring-cloud-alibaba-study’,然后在‘配置管理’-‘配置列表’对应的命名空间下创建配置‘service-order.yml’,在其中添加配置‘author:zhangsan’
(3)将项目中的application.yml 文件改为bootstarp.yml文件,并在其中添加nacos服务发现地址
1 2 3 4 5 6 7 8 9 spring: cloud: nacos: config: server-addr: http://nacosip:8848 prefix: service-order file-extension: yaml group: DEFAULT_GROUP namespace: 49b762ac-54b1-4aff-b17f-6e6a32cfe5b8
(4)编写测试接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Slf4j @RestController @RequestMapping("/test") public class TestController { @Value("${author}") private String author; @GetMapping("/getNacosConfig") public String getNacosConfig () { log.info("author:{}" , author); return author; } }
启动服务,调用测试接口即可获取nacos页面中添加的配置信息
给测试类添加注解‘@RefreshScope’,在nacos网页修改配置信息,项目中就能实时生效
注意:
对于涉外服务,建议使用**bootstrap.yml
**设置其配置,因为这一文件在项目启动时的优先级高于application.yml文件,其在配置服务中心和发现服务时能及时获取到正确的配置;
namespace需要填写命名空间的id,可以在nacos网页的‘命名空间’里面查看到;
更高版本的nacos可以使用更简便的方式引入配置
1 2 3 4 5 spring: cloud: config: import: - nacos:order-server.yaml
配置文件的名称可以和‘spring.profiles.active’配合使用,指定在不同环境中使用不同配置文件
比如设置,比如在bootstarp.yml中设置的是‘dev’,则在nacos中生效的是‘application-dev.yaml’
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 spring: profiles: active: dev cloud: nacos: config: server-addr: nacosip:8848 prefix: application file-extension: yaml group: DEFAULT_GROUP namespace: 49b762ac-54b1-4aff-b17f-6e6a32cfe5b8
5. OpenFeign和Loadbalancer实现服务远程调用和负载均衡 (1)实现‘service-stock’服务的扣减库存接口
参考搭建springboot+mybatis-plus项目 ,利用代码生成器搭建‘service-stock’项目,实现增删改查;
实现扣减库存接口
1 2 3 4 5 6 7 8 public interface StockMapper extends BaseMapper<Stock> { /** * 扣减库存 * * @param productId */ boolean reduceStock(@Param("product_id") Integer productId); }
1 2 3 4 5 6 <update id ="reduceStock" parameterType ="integer" > update stock set `count`=`count` - 1 where product_id = #{product_id} </update >
1 2 3 4 5 6 7 8 9 public interface StockService extends IService <Stock> { boolean reduceStock (Integer productId) ; }
1 2 3 4 5 6 7 8 9 10 @Service public class StockServiceImpl extends ServiceImpl <StockMapper, Stock> implements StockService { @Resource private StockMapper stockMapper; @Override public boolean reduceStock (Integer productId) { return stockMapper.reduceStock(productId); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @RestController @RequestMapping("/stock") public class StockController { @Resource private StockService stockService; @PostMapping("/reduceStock") public Result reduceStock (@RequestParam Integer productId) { boolean flag = stockService.reduceStock(productId); if (flag) { return Result.ok(); } else { return Result.error(); } } }
(2)利用OpenFeign实现‘service-order’服务调用‘service-stock’服务的扣减库存接口
参考搭建springboot+mybatis-plus项目 ,利用代码生成器搭建‘service-order’项目,实现增删改查
引入OpenFeign和loadbalancer,依赖如下:
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-openfeign</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-loadbalancer</artifactId > </dependency >
这里需要说明的是,使用OpenFeign调用其他微服务,只要引入loadbalancer依赖即可实现负载均衡,如果使用RestTemplate方式调用,则需要添加注册类
1 2 3 4 5 6 7 8 @Configuration public class OrderRestTemplateConfig { @LoadBalanced @Bean public RestTemplate getRestTemplate () { return new RestTemplate (); } }
创建OpenFeign接口,这里的方法就是要调用服务的controller中的方法,入参、返回值和路径保持一致(直接复制粘贴就好)
1 2 3 4 5 6 7 8 9 10 11 12 # name是spring.application.name的配置项 @FeignClient(name = "service-stock", path = "/stock") public interface StockOpenFeignClient { @PostMapping("/reduceStock") Result reduceStock (@RequestParam("productId") Integer productId) ; }
在启动类上添加注解
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableDiscoveryClient @EnableFeignClients public class ServiceOrderApplication { public static void main (String[] args) { SpringApplication.run(ServiceOrderApplication.class, args); } }
现在可以使用OpenFeignClient远程调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Slf4j @RestController @RequestMapping("/order") public class OrderController { @Resource private StockOpenFeignClient stockOpenFeignClient; @PostMapping("/add") public Result addOrder (@RequestParam("productId") Integer productId) { Result reduceStockResult = stockOpenFeignClient.reduceStock(productId); if (Result.ok().getCode().equals(reduceStockResult.getCode())) { log.info("扣减库存成功" ); return Result.ok().message("扣减库存成功" ); } return Result.error(); } }
6. gateway实现网关调用 (1)新建gateway模块
(2)添加依赖
- 注:我们的依赖中添加了负载均衡组件‘spring-cloud-starter-loadbalancer’,如果不添加该组件,则配置的转发路径‘uri: lb://xxx’中的负载均衡不会生效
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-gateway</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-discovery</artifactId > </dependency > <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-nacos-config</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-loadbalancer</artifactId > </dependency >
(3)新建‘bootstrap.yml’配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 server: port: 8088 spring: application: name: gateway cloud: nacos: discovery: server-addr: nacosip:8848 config: server-addr: nacosip:8848 prefix: gateway file-extension: yaml group: DEFAULT_GROUP namespace: 49b762ac-54b1-4aff-b17f-6e6a32cfe5b8
(4)在nacos页面新建gateway.yaml配置,添加配置信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 spring: cloud: gateway: routes: - id: order_route uri: lb://service-order predicates: - Path=/order/**
(5)在启动类添加注解@EnableDiscoveryClient启动nacos服务发现(nacos配置中心不需要额外添加注解)
1 2 3 4 5 6 7 @EnableDiscoveryClient @SpringBootApplication public class GatewayApplication { public static void main (String[] args) { SpringApplication.run(GatewayApplication.class, args); } }
(6)启动项目,现在所有外部请求都先访问网关gateway,断言匹配成功将会转发到其他微服务
7. seata实现分布式事务 (1)新建数据表
对于需要实现分布式事务的微服务,都需要在其数据库中新建一张数据表,这张表中的数据即是回滚日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 USE `databasename`; CREATE TABLE `undo_log` ( `id` bigint (20 ) NOT NULL AUTO_INCREMENT, `branch_id` bigint (20 ) NOT NULL , `xid` varchar (100 ) NOT NULL , `context` varchar (128 ) NOT NULL , `rollback_info` longblob NOT NULL , `log_status` int (11 ) NOT NULL , `log_created` datetime NOT NULL , `log_modified` datetime NOT NULL , PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE= InnoDB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;
(2)引入seata依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-seata</artifactId > </dependency > # 如果与服务端版本不同,要统一版本,可以这么配置;先排除客户端版本,再引入与服务端一致的客户端 # 这么做有一定的风险,如果引起错误只能放弃这种处理方式,转而安装与客户端相同的服务端 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-seata</artifactId > <exclusions > <exclusion > <artifactId > io.seata</artifactId > <groupId > seata-spring-boot-starter</groupId > </exclusion > </exclusions > </dependency > <dependency > <groupId > io.seata</groupId > <artifactId > seata-spring-boot-starter</artifactId > <version > 1.3.0</version > </dependency >
(3)添加配置
seata的配置可以添加到本地的配置文件application.yml,也可以添加到我们在之前使用的nacos配置中心,我们这次将它添加到nacos配置中心
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 seata: enabled: true application-id: ${spring.application.name} tx-service-group: my-tx-group service: vgroup-mapping: my-tx-group: default grouplist: default: ip:8091,host1:port1,host2:port2 registry: type: file config: type: file
(4)在需要使用分布式事务的方法上添加注解@GlobalTransactional
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @GlobalTransactional @Override public boolean createOrder (Order order) { storageFeignClient.reduceStorage(order.getProductId(), order.getCount()); accountFeignClient.reduceMoney(order.getUserId(), order.getMoney()); int inserted = orderMapper.insert(order); int i = 1 / 0 ; if (inserted > 0 ) { return true ; } else { return false ; } }
(5)再次测试,可以看到,所有数据都回滚了
(6)这里使用的是seata的AT模式,只能回滚关系型数据库,要回滚nosql数据库的事务,要使用tcc模式,这里贴一段OpenAI的介绍:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 在 Seata 的 AT(Automatic Transaction)模式下,不能直接回滚 Redis 或其他 NoSQL 数据库。AT 模式是基于传统关系型数据库的事务机制来实现分布式事务的,它使用了 undo log 和 branch transaction 来支持本地事务的回滚和恢复。具体来说,它依赖于数据库支持的事务特性(例如,MySQL、Oracle 等数据库的事务回滚)以及与数据库的交互方式,而 NoSQL 数据库(如 Redis、MongoDB 等)并不具备原生的事务回滚机制,因此 Seata 的 AT 模式无法直接对 NoSQL 数据库进行回滚。 1. AT 模式如何工作 在 AT 模式下,Seata 会在数据库操作(如 SQL 操作)期间进行以下步骤: 记录 undo log:Seata 会记录操作前的状态,这样当事务需要回滚时,可以通过 undo log 恢复数据。 事务分支:当应用程序执行事务操作时,Seata 会为每个参与的分支记录事务状态。 提交/回滚:在分布式事务提交或回滚时,Seata 会依赖这些 undo log 来进行恢复操作,确保本地事务的一致性。 这些操作依赖于关系型数据库的事务支持,尤其是它们的 ACID 特性。 2. NoSQL 数据库的局限性 Redis:Redis 是一个键值存储系统,并不支持多命令的原子事务,也没有像关系型数据库那样的 undo log 机制。Redis 支持 MULTI/EXEC 事务,但它并不支持回滚操作,因此不能通过 AT 模式回滚 Redis 操作。 MongoDB:MongoDB 从 4.0 开始支持多文档事务,但它的事务机制与传统关系型数据库不同,而且 Seata AT 模式并不直接支持 MongoDB 的事务回滚。虽然 MongoDB 提供了某些事务特性,但 Seata 并未集成这些特性来支持 NoSQL 数据库的分布式事务回滚。 其他 NoSQL 数据库(例如 Cassandra、HBase 等):类似于 Redis,这些数据库通常也没有回滚功能,或者它们的事务特性与关系型数据库完全不同,因此不能在 Seata 的 AT 模式中直接回滚。 3. 如何处理 NoSQL 数据库中的分布式事务 如果你使用的是 Redis 或其他 NoSQL 数据库,你可以考虑以下几种解决方案: TCC 模式:Seata 提供了 TCC(Try-Cancel/Confirm)模式,这是另一种分布式事务模式,可以通过应用层的业务逻辑进行补偿和回滚,适合更复杂的操作,特别是在不支持事务回滚的 NoSQL 数据库中。TCC 模式要求你手动实现“Try”(尝试)、“Cancel”(取消)和“Confirm”(确认)操作,适用于更细粒度的事务控制。 手动补偿机制:如果你必须在 Redis 或其他 NoSQL 数据库中执行事务操作并确保一致性,可以在应用程序中实现手动补偿逻辑。例如,当事务失败时,可以手动删除或恢复 Redis 中的数据。 使用 XA 模式:Seata 也支持 XA(即二阶段提交协议)模式,适用于支持 XA 事务的数据库。虽然 NoSQL 数据库通常不支持 XA 事务,但如果你的架构中包含支持 XA 的数据库,可以考虑结合使用。 分布式事务管理器:你可以将 Redis 或其他 NoSQL 数据库的操作与关系型数据库事务结合起来,使用分布式事务管理器(如 Seata)进行统一管理,但 Redis 等 NoSQL 数据库依然没有事务回滚的能力,需通过补偿或自定义逻辑处理一致性问题。
关于如何实现tcc模式,可以参看:https://caochenlei.blog.csdn.net/article/details/114054167,但是似乎需要更高版本的seata(1.4.0)才能使用注解@TwoPhaseBusinessAction
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @LocalTCC public interface AccountService { @TwoPhaseBusinessAction(name = "decreaseMoney", commitMethod = "commitDecreaseMoney", rollbackMethod = "rollbackDecreaseMoney") void decrease (@BusinessActionContextParameter(paramName = "userId") Long userId, @BusinessActionContextParameter(paramName = "money") BigDecimal money) ; boolean commitDecreaseMoney (BusinessActionContext context) ; boolean rollbackDecreaseMoney (BusinessActionContext context) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Service public class AccountServiceImpl implements AccountService { @Resource private AccountMapper accountMapper; @Override public void decrease (Long userId, BigDecimal money) { accountMapper.decrease(userId, money); } @Override public boolean commitDecreaseMoney (BusinessActionContext context) { return true ; } @Override public boolean rollbackDecreaseMoney (BusinessActionContext context) { String userId = context.getActionContext("userId" ).toString(); String money = context.getActionContext("money" ).toString(); accountMapper.increase(new Long (userId), new BigDecimal (money)); System.out.println("数据回滚了,这可真的好" ); return true ; } }
8. sentinel实现限流熔断 关于sentinel的使用方式比较多,这里只介绍最简单的限流规则,更多的sentinel的规则配置,可以看另一篇博客;
注:另外要说的是,sentinel现在开源的版本功能是不完全的,阿里有商业版的AHAS Sentinel;我们要使用开源版本,需要注意几点:1. 数据持久化,直接启动jar包的sentinel服务数据是没有持久化的,我们的配置的各种限流规则在服务重启后就会自动丢失,所以我们首先需要将数据持久化,这里我选择持久化到nacos;2. 开源版本实现了规则从nacos到sentinel的单向推送,这就使得我们需要修改规则时要在nacos修改配置文件,而不能在sentinel的网页上进行配置,否则重启sentinel服务后还是会丢失配置,而要实现在sentinel网页配置(同步数据到nacos)则需要我们自己动手改造sentinel的代码,关于如何改造sentinel的代码,实现双边的配置同步,请参看:https://www.sangmuen.com/?p=512;
如果我们选择自己改造sentinel代码,一定要做好测试
(1)不做数据持久化 a.引入依赖
1 2 3 4 5 <dependency > <groupId > com.alibaba.cloud</groupId > <artifactId > spring-cloud-starter-alibaba-sentinel</artifactId > </dependency >
b. 修改配置文件
1 2 3 4 5 spring: cloud: sentinel: transport: dashboard: sentinel-ip:8718
c. 在sentinel-dashboard设置服务的限流规则
这里配置的qps的单级阙值为1,意味着每秒该服务可以接受1次请求;
关于qps的概念
1 每秒查询率(QPS) :每秒查询率QPS是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准,在因特网上,作为域名系统服务器的机器的性能经常用每秒查询率来衡量。对应fetches/sec,即每秒的响应请求数,也即是最大吞吐能力。
(2)push模式(数据单向推送) a. 在客户端(即java项目中)引入依赖
1 2 3 4 5 <dependency > <groupId > com.alibaba.csp</groupId > <artifactId > sentinel-datasource-nacos</artifactId > </dependency >
b. 在配置文件中添加配置(可以添加到nacos配置中心)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 spring: cloud: sentinel: transport: dashboard: sentinel-ip:port eager: true filter: enabled: true datasource: ds1: nacos: serverAddr: nacosip:port groupId: DEFAULT_GROUP dataId: flow-rule dataType: json ruleType: flow
ruleType 配置项用于指定规则类型,常见的值包括:
1 2 3 4 5 6 flow:流控规则(控制请求的流量) degrade:降级规则(服务降级) param-flow:参数规则(基于请求参数的流控) system:系统规则(管理系统资源) auth:权限规则(安全访问控制) warmUp:预热规则(平滑流量控制)
c. 在nacos创建流控规则文件’flow-rule‘,文件类型为json,值得说的是并不能指定nacos的命名空间,只能在public中创建,也许这也是非商业版本的功能缺陷之一
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 [ { "resource" : "/sentinelTest/flowControlMode/doDirect" , "limitApp" : "default" , "grade" : 1 , "count" : 1.0 , "clustreMode" : "false" , "strategy" : 0 , "controlBehavior" : 0 } ]
d. 重启微服务,刷新sentinel页面,我们就能看到配置好的流控规则
(3)push模式(数据双向同步)
注:在后续的测试中,我发现这种方式最多添加3条规则,应该是代码的某处被限制了,看来还是只能使用数据单向推送了
a. 正如前文所言,数据的双向同步需要我们改造sentinel的代码,我们先去下载对应的源码,我下载的版本是1.8.0,下载地址:https://github.com/alibaba/Sentinel/releases
b. 打开项目源码,来到sentinel-dashboard模块,打开pom.xml文件进行依赖设置,如下所示将‘test ’注释,让’sentinel-datasource-nacos’生效
1 2 3 4 5 6 <dependency > <groupId > com.alibaba.csp</groupId > <artifactId > sentinel-datasource-nacos</artifactId > </dependency >
c. 打开application.properties文件,添加如下设置
1 2 nacos.server-addr =nacos:8848 nacos.namespace =
d.将test中的‘com.alibaba.csp.sentinel.dashboard.rule.nacos’文件夹中的内容复制到main中的‘com.alibaba.csp.sentinel.dashboard.rule.nacos’
e. 修改NacosConfig类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Configuration public class NacosConfig { @Value("${nacos.server-addr}") private String serverAddr; @Value("${nacos.namespace}") private String namespace; @Bean public Converter<List<FlowRuleEntity>, String> flowRuleEntityEncoder () { return JSON::toJSONString; } @Bean public Converter<String, List<FlowRuleEntity>> flowRuleEntityDecoder () { return s -> JSON.parseArray(s, FlowRuleEntity.class); } @Bean public ConfigService nacosConfigService () throws Exception { Properties properties = new Properties (); properties.put(PropertyKeyConst.SERVER_ADDR, this .serverAddr); properties.put(PropertyKeyConst.NAMESPACE, this .namespace); return ConfigFactory.createConfigService(properties); } }
可以看到,这里主要是将前文设置的nacos.server-addr和nacos.namespace添加到配置中,namespace如果不设置默认为‘public’;另外在类‘NacosConfigUtil’中有关于GROUP_ID和FLOW_DATA_ID_POSTFIX(dataId的后缀)的设置,GROUP_ID默认为‘SENTINEL_GROUP’,dataId后缀默认为‘-flow-rules’;nacos中的配置文件即根据这些设置来创建
f. 打开webapp中的‘resources/app/scripts/directives/sidebar/sidebar.html’文件,打开‘流控规则 V1’栏目的注释
1 2 3 4 <li ui-sref-active ="active" ng-if ="entry.appType==0" > <a ui-sref ="dashboard.flow({app: entry.app})" > <i class ="glyphicon glyphicon-filter" > </i > 流控规则 V1</a > </li >
g. 用Maven命令‘mvn clean package -DskipTests’来构建jar包
h. 参看前文‘sentinel安装’-‘用docker-compose启动’一节,将构建好的jar包替代之前的jar包,注意保持Dockerfile中的jar包名称与实际一致,运行命令重新构建sentinel的镜像,并且部署容器
i. 重新登录sentinel控制台,可以看到左边栏新增了一个栏目‘流控规则 V1’,这就是我们上文在sidebar.html中打开的注释,我们在其中增加或修改流控规则的配置,就可以同步数据到nacos(namespace、groupId和dataId后缀和sentinel中的配置要保持一致,因为namespace在sentinel中配置为空,所以在nacos中是默认的public)
9. spring-cloud-stream和rocketmq实现消息发送 a. 创建子模块‘service-provider’和‘service-consumer’ b. 添加依赖 前面我们已经引入springboot、springcloud、springcloudalibaba的框架,这里不再赘述;子模块中我们用到的组件为nacos和rocketmq,添加的依赖如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <!-- web mvc --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- nacos服务发现 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <!-- nacos配置中心 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency> <!-- Spring Cloud Stream 对 RocketMQ发送和接收信息 --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency>
注:可以看到我们引入的依赖是‘spring-cloud-starter-stream-rocketmq’,这个依赖引入了spring-cloud-stream,stream这也是spring-cloud中的一个组件,它的作用是屏蔽不同消息中间件的差异,让java项目能以统一的方式管理消息的生产和消费;这里仅仅展示其最简单的用法,更详细的后续再写文章
c. 添加配置文件 我们这里使用nacos配置中心来保存配置文件(既然引入了就用起来),当然先要引入nacos的配置,在项目中resources目录中添加‘bootstrap.yml’文件,内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 spring: cloud: nacos: discovery: server-addr: nacosip:8848 namespace: namespaceid config: server-addr: nacosip:8848 namespace: namespaceid group: DEFAULT_GROUP prefix: service-provider file-extension: yaml
再在nacos配置中心创建对应‘service-provider’、‘service-consumer’文件,内容分别如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 server: port: 8004 spring: application: name: service-provider cloud: stream: rocketmq: binder: name-server: localhost:9876 group: rocketmq-group bindings: output: binder: rocketmq destination: PRAISE-TOPIC-01 group: praise-test content-type: application/json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 server: port: 8005 spring: application: name: service-consumer cloud: stream: rocketmq: binder: name-server: localhost:9876 group: rocketmq-group bindings: input: binder: rocketmq destination: PRAISE-TOPIC-01 group: praise-test content-type: application/json
d. 在启动类添加注解 在模块’service-provider’的启动类中添加注解@EnableBinding(Source.class)
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableDiscoveryClient @EnableBinding(Source.class) public class ProviderApplication { public static void main (String[] args) { SpringApplication.run(ProviderApplication.class, args); } }
在模块’service-consumer’的启动类中添加注解@EnableBinding(Sink.class)
1 2 3 4 5 6 7 8 @SpringBootApplication @EnableDiscoveryClient @EnableBinding(Sink.class) public class ConsumerApplication { public static void main (String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }
e. 在‘service-provider’模块中添加代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @RequestMapping("/test") @RestController public class TestController { @Resource private Source source; @PostMapping("/sendMessage") public Result sendMessage (@RequestBody User user) { boolean res = source.output().send(MessageBuilder.withPayload(user).build()); if (res) { return Result.ok().message("消息发送成功" ); } else { return Result.error().message("消息发送失败" ); } } }
f. 在‘service-consumer’模块中添加代码 1 2 3 4 5 6 7 8 9 10 11 @Slf4j @Component public class TestListener { @Resource private Sink sink; @StreamListener(value = Sink.INPUT) public void processMessage (User user) { log.info("接收到的消息:" + user.toString()); } }
g. 启动项目进行测试 我们分别启动‘service-provider’和‘service-conusmer’,然后调用‘service-provider’的测试接口发送消息
再到‘service-consumer’的控制台查看日志
可以看到,我们发送的消息已经被服务‘service-consumer’消费到了