CN116225735A - Extension method of message-driven programming model based on Spring Cloud Stream - Google Patents
Extension method of message-driven programming model based on Spring Cloud Stream Download PDFInfo
- Publication number
- CN116225735A CN116225735A CN202310054092.0A CN202310054092A CN116225735A CN 116225735 A CN116225735 A CN 116225735A CN 202310054092 A CN202310054092 A CN 202310054092A CN 116225735 A CN116225735 A CN 116225735A
- Authority
- CN
- China
- Prior art keywords
- message
- message queue
- spring
- channel
- programming model
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Granted
Links
Images
Classifications
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/54—Interprogram communication
- G06F9/546—Message passing systems or structures, e.g. queues
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/44—Arrangements for executing specific programs
- G06F9/445—Program loading or initiating
- G06F9/44505—Configuring for program initiating, e.g. using registry, configuration files
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F2209/00—Indexing scheme relating to G06F9/00
- G06F2209/54—Indexing scheme relating to G06F9/54
- G06F2209/547—Messaging middleware
-
- Y—GENERAL TAGGING OF NEW TECHNOLOGICAL DEVELOPMENTS; GENERAL TAGGING OF CROSS-SECTIONAL TECHNOLOGIES SPANNING OVER SEVERAL SECTIONS OF THE IPC; TECHNICAL SUBJECTS COVERED BY FORMER USPC CROSS-REFERENCE ART COLLECTIONS [XRACs] AND DIGESTS
- Y02—TECHNOLOGIES OR APPLICATIONS FOR MITIGATION OR ADAPTATION AGAINST CLIMATE CHANGE
- Y02D—CLIMATE CHANGE MITIGATION TECHNOLOGIES IN INFORMATION AND COMMUNICATION TECHNOLOGIES [ICT], I.E. INFORMATION AND COMMUNICATION TECHNOLOGIES AIMING AT THE REDUCTION OF THEIR OWN ENERGY USE
- Y02D10/00—Energy efficient computing, e.g. low power processors, power management or thermal management
Landscapes
- Engineering & Computer Science (AREA)
- Software Systems (AREA)
- Theoretical Computer Science (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Stored Programmes (AREA)
- Data Exchanges In Wide-Area Networks (AREA)
- Telephonic Communication Services (AREA)
- Information Transfer Between Computers (AREA)
Abstract
本发明公开了基于SpringCloudStream的消息驱动编程模型的扩展方法,包括如下步骤:S1、启动Spring项目,初始化消息队列中间件的环境配置;S2、初始化消息队列消费者的配置并调用内置的消息队列消费者处理器以动态创建消息输入通道,将注册至Spring容器中;S3、采用@MQListener注解方法动态创建并缓存消息订阅通道;S4、Spring项目启动完成后,遍历消息订阅通道内的缓存进行消息队列消费者的绑定和启动;S5、初始化消息队列生产者的配置,调用内置的消息队列生产者处理器以动态创建、绑定消息输出通道,并发送消息。本发明的方法实现了在消息生产和消息消费中动态创建并绑定消息通道的特性,从而支持更加灵活的消息驱动的业务场景。
The invention discloses an extension method of a message-driven programming model based on SpringCloudStream, comprising the following steps: S1, starting the Spring project, initializing the environment configuration of the message queue middleware; S2, initializing the configuration of the message queue consumer and invoking the built-in message queue consumption The processor will dynamically create a message input channel, which will be registered in the Spring container; S3, use the @MQListener annotation method to dynamically create and cache the message subscription channel; S4, after the Spring project is started, traverse the cache in the message subscription channel for message queuing Binding and starting of the consumer; S5. Initialize the configuration of the message queue producer, call the built-in message queue producer processor to dynamically create and bind the message output channel, and send the message. The method of the present invention realizes the feature of dynamically creating and binding message channels in message production and message consumption, thereby supporting more flexible message-driven business scenarios.
Description
技术领域technical field
本发明涉及计算机技术领域,特别涉及基于Spring Cloud Stream的消息驱动编程模型的扩展方法。The invention relates to the field of computer technology, in particular to an extension method of a message-driven programming model based on Spring Cloud Stream.
背景技术Background technique
Spring Cloud Stream是一个构建消息驱动微服务的框架,它解决了研发人员无感知的使用各种消息中间件的问题。Spring Cloud Stream依赖配置调整来支持不同类型消息中间件的动态切换和相应消息通道(包括默认消息通道或自定义消息通道)的维护,使得应用可以更多的关注自身的业务实现。Spring Cloud Stream is a framework for building message-driven microservices, which solves the problem of developers using various message middleware without awareness. Spring Cloud Stream relies on configuration adjustments to support the dynamic switching of different types of message middleware and the maintenance of corresponding message channels (including default message channels or custom message channels), so that applications can pay more attention to their own business implementation.
目前Spring Cloud Stream的编程模型采用的是预先通过配置文件方式绑定消息通道,然后在消息生产和消息监听的消费处理类中通过Spring Bean(框架运行时的管理对象)的注入和注解的方式以实现对应消息通道的显示和声明。该方式存在的问题在于:在消息生产和消息监听的消费中无法动态创建并绑定消息通道,当程序需要根据动态的运行条件(如集团ID)动态切换到对应的MQ消费者和MQ生产者时(MQ,Message Queue,即消息队列),需要采用预先配置消息输入/输出通道的方法,不仅配置项冗长复杂难以维护,而且还需创建大量相同功能的支持类文件,从而造成程序文件数量的大量增加。At present, the programming model of Spring Cloud Stream is to bind the message channel through the configuration file in advance, and then inject and annotate the Spring Bean (the management object of the framework runtime) in the consumption processing class of message production and message monitoring. Realize the display and declaration of the corresponding message channel. The problem with this method is that it is impossible to dynamically create and bind message channels in the consumption of message production and message monitoring. When the program needs to dynamically switch to the corresponding MQ consumer and MQ producer according to dynamic operating conditions (such as group ID) (MQ, Message Queue, that is, message queue), it is necessary to use the method of pre-configuring the message input/output channel. Not only the configuration items are lengthy and complicated, it is difficult to maintain, but also a large number of supporting class files with the same function need to be created, resulting in a large number of program files. Increase rapidly.
发明内容Contents of the invention
为解决上述问题,本发明提供了基于Spring Cloud Stream的消息驱动编程模型的扩展方法。In order to solve the above problems, the present invention provides an extension method of the message-driven programming model based on Spring Cloud Stream.
本发明采用以下技术方案:The present invention adopts following technical scheme:
基于Spring Cloud Stream的消息驱动编程模型的扩展方法,包括如下步骤:The extension method of the message-driven programming model based on Spring Cloud Stream includes the following steps:
S1、启动Spring项目,初始化消息队列中间件的环境配置;S1. Start the Spring project and initialize the environment configuration of the message queue middleware;
S2、在Spring Bean生命周期的初始化回调方法中,初始化消息队列消费者的配置并调用内置的消息队列消费者处理器以动态创建消息输入通道,将注册至Spring容器中;S2. In the initialization callback method of the Spring Bean life cycle, initialize the configuration of the message queue consumer and call the built-in message queue consumer processor to dynamically create a message input channel, which will be registered in the Spring container;
S3、在Spring Bean生命周期的后置处理器初始化回调方法中,采用@MQListener注解方法动态创建并缓存消息订阅通道;S3. In the post-processor initialization callback method of the Spring Bean life cycle, the @MQListener annotation method is used to dynamically create and cache the message subscription channel;
S4、Spring项目启动完成后,遍历消息订阅通道内的缓存进行消息队列消费者的绑定和启动;S4. After the Spring project is started, traverse the cache in the message subscription channel to bind and start the message queue consumer;
S5、初始化消息队列生产者的配置,调用内置的消息队列生产者处理器以动态创建、绑定消息输出通道,并发送消息。S5. Initialize the configuration of the message queue producer, call the built-in message queue producer processor to dynamically create and bind the message output channel, and send the message.
进一步地,所述消息队列中间件的环境配置包括Spring Cloud Stream配置项下的默认绑定者和绑定者列表消息。Further, the environment configuration of the message queue middleware includes a default binder and a binder list message under the Spring Cloud Stream configuration item.
进一步地,所述绑定者列表信息包括消息队列生产者策略配置信息、消息队列消费者策略配置信息和消息队列服务器连接信息。Further, the binder list information includes message queue producer policy configuration information, message queue consumer policy configuration information and message queue server connection information.
进一步地,所述消息队列消费者的配置包括通道主题、异常重试策略、消费分组以及绑定者。Further, the configuration of the message queue consumer includes channel topics, exception retry policies, consumption groups and binders.
进一步地,所述消息订阅通道用于监听所述消息队列消费者的对应通道主题下的消息队列中的消息。Further, the message subscription channel is used to monitor messages in the message queue under the corresponding channel topic of the message queue consumer.
进一步地,所述Spring Bean生命周期包括在所述Spring项目的启动阶段进行Bean对象的实例化、属性填充和初始化过程。Further, the Spring Bean life cycle includes instantiation, property filling and initialization of Bean objects during the startup phase of the Spring project.
进一步地,步骤S4中,只有当消息订阅通道内缓存中的消息队列消费者全部绑定成功后才开始进行逐个启动消息队列消费者,若任意一个消息队列消费者启动失败则停止全部消息队列消费者的启动。Further, in step S4, start message queue consumers one by one only when all message queue consumers in the cache in the message subscription channel are successfully bound, and stop all message queue consumption if any message queue consumer fails to start the startup of the
进一步地,所述消息队列生产者的配置包括通道主题、生产者分组以及绑定者。Further, the configuration of the message queue producer includes channel topics, producer groups and binders.
进一步地,所述内置的消息队列消费者处理器用于统一执行消息输入通道的动态创建和消息队列消费监听处理器的Bean逻辑的注册。Further, the built-in message queue consumer processor is used to uniformly execute the dynamic creation of the message input channel and the registration of the Bean logic of the message queue consumption monitoring processor.
进一步地,内置的消息队列生产者处理器用于统一执行消息输出通道的动态创建、消息队列生产者处理器的Bean逻辑的注册和消息输出通道的绑定。Furthermore, the built-in message queue producer processor is used to uniformly execute the dynamic creation of the message output channel, the registration of the Bean logic of the message queue producer processor and the binding of the message output channel.
采用上述技术方案后,本发明与背景技术相比,具有如下优点:After adopting the technical solution, the present invention has the following advantages compared with the background technology:
本发明扩展了Spring Cloud Stream的消息驱动编程模型,使其具备在消息生产和消息消费中动态创建并绑定消息通道的特性,从而支持更加灵活的消息驱动的业务场景。The present invention extends the message-driven programming model of Spring Cloud Stream, so that it has the characteristics of dynamically creating and binding message channels in message production and message consumption, thereby supporting more flexible message-driven business scenarios.
附图说明Description of drawings
图1为本发明的方法流程图。Fig. 1 is a flow chart of the method of the present invention.
具体实施方式Detailed ways
为了使本发明的目的、技术方案及优点更加清楚明白,以下结合附图及实施例,对本发明进行进一步详细说明。应当理解,此处所描述的具体实施例仅用以解释本发明,并不用于限定本发明。In order to make the object, technical solution and advantages of the present invention clearer, the present invention will be further described in detail below in conjunction with the accompanying drawings and embodiments. It should be understood that the specific embodiments described here are only used to explain the present invention, not to limit the present invention.
实施例Example
如图1所示,基于Spring Cloud Stream的消息驱动编程模型的扩展方法,包括如下步骤:As shown in Figure 1, the extension method of the message-driven programming model based on Spring Cloud Stream includes the following steps:
S1、启动Spring项目,初始化消息队列中间件的环境配置;S1. Start the Spring project and initialize the environment configuration of the message queue middleware;
所述消息队列中间件的环境配置包括Spring Cloud Stream配置项下的默认绑定者和绑定者列表消息。所述绑定者列表信息包括消息队列生产者策略配置信息、消息队列消费者策略配置信息和消息队列服务器连接信息。The environment configuration of the message queue middleware includes a default binder and a binder list message under the Spring Cloud Stream configuration item. The binder list information includes message queue producer policy configuration information, message queue consumer policy configuration information and message queue server connection information.
S2、在Spring Bean生命周期的初始化回调方法中,初始化消息队列消费者的配置并调用内置的消息队列消费者处理器以动态创建消息输入通道,将注册至Spring容器中;S2. In the initialization callback method of the Spring Bean life cycle, initialize the configuration of the message queue consumer and call the built-in message queue consumer processor to dynamically create a message input channel, which will be registered in the Spring container;
所述消息队列消费者的配置包括通道主题、异常重试策略、消费分组以及绑定者。所述内置的消息队列消费者处理器用于统一执行消息输入通道的动态创建和消息队列消费监听处理器的Bean逻辑的注册。The configuration of the message queue consumer includes channel topic, exception retry policy, consumption group and binder. The built-in message queue consumer processor is used to uniformly execute the dynamic creation of the message input channel and the registration of the Bean logic of the message queue consumption monitoring processor.
S3、在Spring Bean生命周期的后置处理器初始化回调方法中,采用@MQListener注解方法动态创建并缓存消息订阅通道;S3. In the post-processor initialization callback method of the Spring Bean life cycle, the @MQListener annotation method is used to dynamically create and cache the message subscription channel;
所述消息订阅通道用于监听所述消息队列消费者的对应通道主题下的消息队列中的消息。所述Spring Bean生命周期包括在所述Spring项目的启动阶段进行Bean对象的实例化、属性填充和初始化过程。The message subscription channel is used to monitor messages in the message queue under the corresponding channel topic of the message queue consumer. The Spring Bean life cycle includes the Bean object instantiation, attribute filling and initialization process in the startup phase of the Spring project.
S4、Spring项目启动完成后,遍历消息订阅通道内的缓存进行消息队列消费者的绑定和启动;S4. After the Spring project is started, traverse the cache in the message subscription channel to bind and start the message queue consumer;
步骤S4中,只有当消息订阅通道内缓存中的消息队列消费者全部绑定成功后才开始进行逐个启动消息队列消费者,若任意一个消息队列消费者启动失败则停止全部消息队列消费者的启动。In step S4, start the message queue consumers one by one only after all the message queue consumers in the cache in the message subscription channel are successfully bound, and stop the startup of all message queue consumers if any message queue consumer fails to start .
S5、初始化消息队列生产者的配置,调用内置的消息队列生产者处理器以动态创建、绑定消息输出通道,并发送消息。S5. Initialize the configuration of the message queue producer, call the built-in message queue producer processor to dynamically create and bind the message output channel, and send the message.
所述消息队列生产者的配置包括通道主题、生产者分组以及绑定者。内置的消息队列生产者处理器用于统一执行消息输出通道的动态创建、消息队列生产者处理器的Bean逻辑的注册和消息输出通道的绑定。The configuration of the message queue producer includes channel topic, producer group and binder. The built-in message queue producer processor is used to uniformly execute the dynamic creation of the message output channel, the registration of the Bean logic of the message queue producer processor and the binding of the message output channel.
本实施例扩展了Spring Cloud Stream的消息驱动编程模型,使其具备在消息生产和消息消费中动态创建并绑定消息通道的特性,从而支持更加灵活的消息驱动的业务场景。This embodiment extends the message-driven programming model of Spring Cloud Stream so that it has the feature of dynamically creating and binding message channels in message production and message consumption, thereby supporting more flexible message-driven business scenarios.
以上所述,仅为本发明较佳的具体实施方式,但本发明的保护范围并不局限于此,任何熟悉本技术领域的技术人员在本发明揭露的技术范围内,可轻易想到的变化或替换,都应涵盖在本发明的保护范围之内。因此,本发明的保护范围应该以权利要求的保护范围为准。The above is only a preferred embodiment of the present invention, but the scope of protection of the present invention is not limited thereto. Any person skilled in the art within the technical scope disclosed in the present invention can easily think of changes or Replacement should be covered within the protection scope of the present invention. Therefore, the protection scope of the present invention should be determined by the protection scope of the claims.
Claims (10)
Priority Applications (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202310054092.0A CN116225735B (en) | 2023-02-03 | 2023-02-03 | Extension methods based on Spring Cloud Stream's message-driven programming model |
Applications Claiming Priority (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202310054092.0A CN116225735B (en) | 2023-02-03 | 2023-02-03 | Extension methods based on Spring Cloud Stream's message-driven programming model |
Publications (2)
| Publication Number | Publication Date |
|---|---|
| CN116225735A true CN116225735A (en) | 2023-06-06 |
| CN116225735B CN116225735B (en) | 2025-12-02 |
Family
ID=86590391
Family Applications (1)
| Application Number | Title | Priority Date | Filing Date |
|---|---|---|---|
| CN202310054092.0A Active CN116225735B (en) | 2023-02-03 | 2023-02-03 | Extension methods based on Spring Cloud Stream's message-driven programming model |
Country Status (1)
| Country | Link |
|---|---|
| CN (1) | CN116225735B (en) |
Citations (5)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN106933589A (en) * | 2017-03-13 | 2017-07-07 | 车智互联(北京)科技有限公司 | A kind of message queue component based on configuration and its integrated method |
| EP3337103A1 (en) * | 2016-12-16 | 2018-06-20 | DreamWorks Animation LLC | Scalable messaging system |
| WO2020107016A1 (en) * | 2018-11-22 | 2020-05-28 | Jeffrey Alan Carley | Message broker customization with user administered policy functions |
| CN111444449A (en) * | 2018-12-27 | 2020-07-24 | 北京奇虎科技有限公司 | A kind of Http request processing method and device |
| US20200241942A1 (en) * | 2019-01-28 | 2020-07-30 | Salesforce.Com, Inc. | Method and system for processing a stream of incoming messages sent from a specific input message source and validating each incoming message of that stream before sending them to a specific target system |
-
2023
- 2023-02-03 CN CN202310054092.0A patent/CN116225735B/en active Active
Patent Citations (5)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| EP3337103A1 (en) * | 2016-12-16 | 2018-06-20 | DreamWorks Animation LLC | Scalable messaging system |
| CN106933589A (en) * | 2017-03-13 | 2017-07-07 | 车智互联(北京)科技有限公司 | A kind of message queue component based on configuration and its integrated method |
| WO2020107016A1 (en) * | 2018-11-22 | 2020-05-28 | Jeffrey Alan Carley | Message broker customization with user administered policy functions |
| CN111444449A (en) * | 2018-12-27 | 2020-07-24 | 北京奇虎科技有限公司 | A kind of Http request processing method and device |
| US20200241942A1 (en) * | 2019-01-28 | 2020-07-30 | Salesforce.Com, Inc. | Method and system for processing a stream of incoming messages sent from a specific input message source and validating each incoming message of that stream before sending them to a specific target system |
Also Published As
| Publication number | Publication date |
|---|---|
| CN116225735B (en) | 2025-12-02 |
Similar Documents
| Publication | Publication Date | Title |
|---|---|---|
| US12468552B2 (en) | Execution of sub-application processes within application program | |
| US11803451B2 (en) | Application exception recovery | |
| JP6009459B2 (en) | Store and resume application runtime state | |
| CN106164866B (en) | Efficient migration of client-side WEB state | |
| US10146599B2 (en) | System and method for a generic actor system container application | |
| CN107832099B (en) | A client version compatible method, device and storage medium | |
| WO2022037612A1 (en) | Method for providing application construction service, and application construction platform, application deployment method and system | |
| US20080209390A1 (en) | Pluggable model elements | |
| WO2022134358A1 (en) | Microservice data processing method, apparatus, microservice processing platform, and medium | |
| JP2015534145A (en) | User interface control framework for stamping out controls using declarative templates | |
| US8499294B2 (en) | Persisting the changes for managed components in an application server | |
| WO2023065707A1 (en) | Method and apparatus for page display | |
| CN101907989A (en) | A method for seamless application migration based on mobile agent | |
| CN108008950B (en) | Method and device for realizing user interface updating | |
| WO2019201340A1 (en) | Processor core scheduling method and apparatus, terminal, and storage medium | |
| WO2025124172A1 (en) | Method and apparatus for component deployment and updating, computer device, and storage medium | |
| CN106055348B (en) | A kind of method and apparatus that notice system property updates | |
| KR102332809B1 (en) | Stream based event processing utilizing virtual streams and processing agents | |
| CN112235132B (en) | Method, device, medium and server for dynamically configuring service | |
| CN118193499A (en) | Device, method and system for heterogeneous database full migration | |
| CN111966508A (en) | Method, device, computer equipment and storage medium for batch sending of messages | |
| CN115658109A (en) | Microservice hot deployment method, system, electronic device and storage medium | |
| CN110673827B (en) | Android system-based resource calling method, device, and electronic equipment | |
| CN101634945A (en) | Dynamic sensing model of members and application thereof to Web presentation layer of RFID middleware | |
| CN114579164A (en) | Method and device for updating user interface of application program |
Legal Events
| Date | Code | Title | Description |
|---|---|---|---|
| PB01 | Publication | ||
| PB01 | Publication | ||
| SE01 | Entry into force of request for substantive examination | ||
| SE01 | Entry into force of request for substantive examination | ||
| GR01 | Patent grant |