CN119884229B - A processing method and system supporting multi-source heterogeneous data - Google Patents
A processing method and system supporting multi-source heterogeneous data Download PDFInfo
- Publication number
- CN119884229B CN119884229B CN202510363080.5A CN202510363080A CN119884229B CN 119884229 B CN119884229 B CN 119884229B CN 202510363080 A CN202510363080 A CN 202510363080A CN 119884229 B CN119884229 B CN 119884229B
- Authority
- CN
- China
- Prior art keywords
- data
- time
- real
- etl
- data processing
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Active
Links
Classifications
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/25—Integrating or interfacing systems involving database management systems
- G06F16/254—Extract, transform and load [ETL] procedures, e.g. ETL data flows in data warehouses
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/24—Querying
- G06F16/245—Query processing
- G06F16/2455—Query execution
- G06F16/24568—Data stream processing; Continuous queries
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/24—Querying
- G06F16/248—Presentation of query results
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/28—Databases characterised by their database models, e.g. relational or object models
- G06F16/283—Multi-dimensional databases or data warehouses, e.g. MOLAP or ROLAP
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F21/00—Security arrangements for protecting computers, components thereof, programs or data against unauthorised activity
- G06F21/60—Protecting data
- G06F21/604—Tools and structures for managing or administering access control systems
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F21/00—Security arrangements for protecting computers, components thereof, programs or data against unauthorised activity
- G06F21/60—Protecting data
- G06F21/62—Protecting access to data via a platform, e.g. using keys or access control rules
- G06F21/6218—Protecting access to data via a platform, e.g. using keys or access control rules to a system of files or objects, e.g. local or distributed file system or database
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06N—COMPUTING ARRANGEMENTS BASED ON SPECIFIC COMPUTATIONAL MODELS
- G06N3/00—Computing arrangements based on biological models
- G06N3/02—Neural networks
- G06N3/04—Architecture, e.g. interconnection topology
- G06N3/045—Combinations of networks
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06N—COMPUTING ARRANGEMENTS BASED ON SPECIFIC COMPUTATIONAL MODELS
- G06N3/00—Computing arrangements based on biological models
- G06N3/02—Neural networks
- G06N3/08—Learning methods
- G06N3/092—Reinforcement learning
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F2221/00—Indexing scheme relating to security arrangements for protecting computers, components thereof, programs or data against unauthorised activity
- G06F2221/21—Indexing scheme relating to G06F21/00 and subgroups addressing additional information or applications relating to security arrangements for protecting computers, components thereof, programs or data against unauthorised activity
- G06F2221/2141—Access rights, e.g. capability lists, access control lists, access tables, access matrices
Landscapes
- Engineering & Computer Science (AREA)
- Theoretical Computer Science (AREA)
- Physics & Mathematics (AREA)
- Databases & Information Systems (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Data Mining & Analysis (AREA)
- Health & Medical Sciences (AREA)
- General Health & Medical Sciences (AREA)
- Computational Linguistics (AREA)
- Software Systems (AREA)
- Computer Security & Cryptography (AREA)
- Life Sciences & Earth Sciences (AREA)
- Artificial Intelligence (AREA)
- Biomedical Technology (AREA)
- Biophysics (AREA)
- Computer Hardware Design (AREA)
- Bioethics (AREA)
- Evolutionary Computation (AREA)
- Molecular Biology (AREA)
- Computing Systems (AREA)
- Mathematical Physics (AREA)
- Automation & Control Theory (AREA)
- Management, Administration, Business Operations System, And Electronic Commerce (AREA)
Abstract
The invention relates to the technical field of data processing, in particular to a processing method and a system for supporting multi-source heterogeneous data, wherein the method comprises the steps of collecting data from different target systems and adding an identification tag of a corresponding target system for each piece of data; the method comprises the steps of constructing a plurality of real-time pipelines parallel to an ETL path to process collected data, writing analysis failure data into an error data area, generating warning information, removing the analysis failure data from the collected data, writing the analysis failure data into a temporary storage area, writing the output of a real-time data processing system and/or the output of the ETL path into a data warehouse, creating a visual dynamic report by utilizing the data in the data warehouse, analyzing and predicting the dynamic report data through a pre-trained machine learning model, and writing a prediction result into the dynamic report to predict inventory requirements and identify risks of suppliers.
Description
Technical Field
The present invention relates to the field of data processing technologies, and in particular, to a processing method and system supporting multi-source heterogeneous data.
Background
Heterogeneous data (Heterogeneous Data) refers to a collection of data that differs in structure, format, type, or manner of storage. Such data typically comes from different data sources or systems, with a variety of organization forms.
In SPD business of medical system, report forms covering core data of products, purchase, stock and the like are generated, and data from different systems and different structures are required to be integrated through a multi-source heterogeneous data processing system. Existing systems typically employ ETL (Extract-Transform-Load) and data warehouse techniques in combination with serialization framework and metadata management when handling multi-source heterogeneous data. However, the conventional ETL tool depends on batch processing, is difficult to deal with real-time data streams, and is easy to cause sensitive information leakage due to inconsistent authority models when integrating a plurality of system data.
Disclosure of Invention
According to the invention, the open source stream processing platform Kafka and the stream data processing engine Flink are combined to construct a real-time pipeline, and a plurality of real-time pipelines form a stream data processing system, so that the capability of the system for processing real-time data is improved.
The technical scheme provided by the invention is that a processing method supporting multi-source heterogeneous data comprises the following steps:
Collecting data from different target systems, and adding an identification tag of a corresponding target system to each piece of data;
constructing a plurality of real-time pipelines parallel to the ETL path to process the acquired data, writing analysis failure data into an error data area, and generating warning information;
writing the output of the real-time data processing system and/or the output of the ETL path into a data warehouse;
And creating a visual dynamic report by utilizing the data in the data warehouse, analyzing and predicting the dynamic report data by a pre-trained machine learning model, and writing a prediction result into the dynamic report.
Preferably, the data successfully analyzed form a correct data set, and the data failed to be analyzed form an error data set;
Before acquiring data from different target systems, defining global strategy and unifying authority of each target system;
The defining global policy includes:
registering the target system in a console of a security management framework Ranger, and importing a resource hierarchy of the target system, wherein the resource hierarchy comprises a table and a column group;
selecting corresponding resources according to the resource levels of all target systems, and designating operations corresponding to the resources, including permission operations and refusal operations;
The unified target system authorities comprise:
mapping the resources of each target system into the unified identifier of the Ranger;
Mapping the authority model of each target system into a unified resource-operation-condition model;
creating a global role, and associating the global role with a corresponding model to bind the resources of a corresponding target system, wherein the conditions comprise an IP range and effective duration of operation;
mapping the authority model of each target system into a unified resource-operation-condition model comprises the following steps:
synchronizing the clock of the range and the target system using the NTP protocol;
Setting an expansion plug-in for the authority model of each target system, and realizing the change of the authority model by utilizing the polling of the expansion plug-in;
The polling interval of the expansion plug-in is dynamically adjusted to ensure that the permission model of the target system can be mapped in time.
Preferably, the dynamically adjusting the polling interval of the extension plugin to ensure that the permission model of the target system can be mapped in time includes:
recording the initial time of each model mapping, and returning the mapping completion time after the authority model completes the mapping;
dynamically calculating a new polling interval through a sliding window weighted average delay and a proportional-integral control algorithm;
and monitoring delay, and replacing the current polling interval with a new polling interval to adjust the polling interval, so as to ensure that the permission model of the target system can be mapped in time.
Preferably, the dynamically calculating the new polling interval by the sliding window weighted average delay and the proportional integral control algorithm includes:
after each authority model mapping is completed, calculating time difference WhereinThe time at which the mapping is completed is indicated,Representing a start time of the model map;
Calculating a weighted average delay , wherein,Indicating the size of the sliding window and,Showing the weight coefficient; indicating the current time Previous firstTime delays for the respective time points;
Calculating control variables Wherein, the method comprises the steps of,Representing a target delay threshold value of the delay,Which represents the integral gain one,Representing a proportional gain one; Represent the first Average delay of the individual sliding windows;
Calculating new polling intervals Wherein, the method comprises the steps of,Representing a current polling interval; Indicating the maximum value of the polling interval, A polling interval maximum;
If it is Replacing the current polling interval withAnd skips the calculation of the control variable.
Preferably, the defining the global policy further includes:
creating a business semantic label;
associating the business semantic tags with resources of different target systems;
and identifying global role information, distributing different service semantic tags for different global roles, and accessing corresponding resources by the global roles through the corresponding service semantic tags.
Preferably, constructing a plurality of real-time pipelines parallel to the ETL path to process the acquired data, including:
combining an open source stream processing platform Kafka with a stream data processing engine Flink to construct a real-time pipeline, wherein a plurality of real-time pipelines form a stream data processing system so as to process a plurality of data streams;
if the target system only supports batch file transmission, setting a batch file transmission plug-in the Kafka and pulling the data from a temporary storage area;
after the acquired data set is cleaned, data is acquired in real time through a sliding time window, so that a data stream is formed;
writing the data stream into a preset target storage area, locking, unlocking the target storage area after the Flink completes the detection of the state point, and outputting data;
and constructing a gradual switching strategy, controlling the switching process of the ETL to the real-time pipeline, and gradually switching the ETL data processing task of the target system to the real-time pipeline.
Preferably, the step of constructing a gradual switching strategy, controlling a process of switching the ETL to the real-time pipeline, gradually switching the ETL data processing task of the target system to the real-time pipeline, includes:
quantifying risk indexes in the switching process, wherein the risk indexes comprise data difference rates before and after switching, real-time pipeline processing delay time, flink detection failure rate and downstream report generation error rate;
writing data into the Kafka and the original ETL path at the same time, comparing the output of the ETL path with the output of the real-time pipeline, and stopping switching if the data difference rate of the two outputs is greater than a preset difference rate threshold;
Switching non-core task data to a real-time pipeline, wherein the core task data is still transmitted through an original ETL path;
Detecting the transmission quantity of the core task data, if the transmission quantity of the core task data is lower than a preset transmission quantity threshold value in a preset time period, switching the core task data to a real-time pipeline, and keeping the original ETL path The time of a month is one month,;
When the real-time pipeline processing delay time exceeds a preset delay time threshold, cutting off the real-time pipeline, and automatically selecting an original ETL path to transmit data.
Preferably, the step of constructing a gradual switching strategy, controlling a process of switching the ETL to the real-time pipeline, gradually switching the ETL data processing task of the target system to the real-time pipeline, and further includes:
Acquiring real-time data processing system output and ETL path output;
Calculating the relative entropy of the real-time data processing system output and the ETL path output:
, wherein, Representing the probability distribution of critical data in the output of the real-time data processing system,Representing probability distribution of the same key data in the ETL output; represents the Kullback-Leibler divergence;
the switching process is dynamically adjusted by a PID controller, comprising the sub-steps of:
The proportion of data flowing into the real-time data processing system is set as follows:
;
wherein, the ;The ratio gain two is indicated as being the ratio gain two,The second integral gain is indicated as being the integral gain,Representing differential gain;
setting a sliding time window II, collecting output data of a real-time data processing system and an ETL path in real time, and calculating And;
By means ofAndCalculating the change quantity of the proportion of the data flowing into the real-time data processing system after a sliding time window II, wherein,The length of the sliding time window II;
PPO dynamic optimization by reinforcement learning algorithm 、、Comprising the following substeps:
setting a state space ,
Wherein the integral termDifferential term;
Setting an action space;
Wherein, the adjustment amount of the proportional gain II;
Adjustment amount of integral gain two,
Differential gain adjustment;
Setting a bonus function;
Wherein, the The weight of the error is represented by the weight of the error,Indicating a penalty for the change in the control amount,Representing overshoot penalties;
overshoot amount ,Indicating at the current timeThe proportion of data flowing into the real-time data processing system at a previous point in time;
Constructing reinforcement learning algorithm by using Actor-Critic framework, and updating elements in state space by using elements in action space to realize the reinforcement learning algorithm Is updated according to the update of (a);
the updating of the elements in the state space by using the elements in the action space specifically comprises the following steps: ,, Wherein, the method comprises the steps of, 、、Representing the updated proportional gain two, integral gain two and differential gain.
Preferably, the creating a visual dynamic report by using data in a data warehouse, analyzing and predicting the dynamic report data by using a pre-trained machine learning model, and writing the prediction result into the dynamic report, includes:
Acquiring data of different target systems from a data warehouse, and creating a dynamic report by using a BI tool Tableau;
loading a machine learning model, inputting data of different target systems to generate corresponding prediction results, and writing the prediction results into a dynamic report, wherein the method specifically comprises the following steps:
Acquiring data from a data warehouse, and identifying the data of different target systems through identification tags of the target systems;
Extracting characteristic values of data of different target systems, and then carrying out standardization processing to form a target system characteristic set;
inputting elements in the target system feature set as input variables into a machine learning model, and outputting a prediction result;
and writing the predicted result into a data warehouse, reading the predicted result from the data warehouse by using a BI tool, writing the predicted result into a dynamic report, and updating the dynamic report, wherein the machine learning model comprises one or more of a time sequence prediction model LSTM, a classification model XGBoost and a linear regression model.
A processing system supporting multi-source heterogeneous data comprises a server, a processor, a communication module, a data acquisition interface and a memory, wherein the communication module is in communication connection with the processor, the processor is connected with the server through the communication module, and the system is used for executing the processing method supporting the multi-source heterogeneous data.
The invention has the beneficial effects that:
According to the invention, a progressive switching strategy is constructed by replacing the traditional ETL with the real-time pipeline, the process of switching the ETL to the real-time pipeline is controlled, the ETL data processing task of the target system is gradually switched to the real-time pipeline, the real-time performance of the system for processing multi-source heterogeneous data is improved, the problem of non-uniform authority of a plurality of target systems is solved by using Ranger unified authorities, and the delay of model mapping is optimized by dynamically adjusting the polling interval.
Drawings
FIG. 1 is a flow chart of a processing method supporting multi-source heterogeneous data according to the present invention.
Detailed Description
The following description is presented to enable one of ordinary skill in the art to make and use the invention. The preferred embodiments in the following description are by way of example only and other obvious variations will occur to those skilled in the art. The basic principles of the invention defined in the following description may be applied to other embodiments, variations, modifications, equivalents, and other technical solutions without departing from the spirit and scope of the invention.
It will be understood that the terms "a" and "an" should be interpreted as referring to "at least one" or "one or more," i.e., in one embodiment, the number of elements may be one, while in another embodiment, the number of elements may be plural, and the term "a" should not be interpreted as limiting the number.
Embodiment one:
Referring to fig. 1, the technical scheme provided by the invention is that a processing method and a system for supporting multi-source heterogeneous data comprise the following steps:
the method comprises the steps of acquiring data from different target systems, adding identification tags of the corresponding target systems to each piece of data, defining global strategies and unifying the authority of each target system.
The purpose of this step is that the conventional ETL tool cannot process some real-time data (e.g., sensor data), which can easily lead to delays in inventory reporting. In addition, when cross-system rights are managed and controlled, the ERP, WMS and logistics system rights model are inconsistent, so that the data access risk is increased.
Wherein defining the global policy comprises:
registering the target system in the console of the security management frame Ranger, importing the resource hierarchy of the target system, wherein the resource hierarchy comprises a table and a column, selecting corresponding resources according to the resource hierarchy of each target system, and designating the operation corresponding to the resources, including the permission operation and the rejection operation.
Wherein unifying the rights of each target system comprises:
The method comprises the steps of mapping resources of each target system into uniform identification of Ranger, converting native operation of each target system into standard operation of Ranger, mapping authority models of each target system into uniform resource-operation-condition models, creating global roles, associating the global roles with corresponding models to bind the resources of the corresponding target systems, and enabling the conditions to comprise IP range and effective duration of operation.
Mapping the rights model of each target system into a unified "resource-operation-condition" model, comprising:
synchronizing the clock of the range and the target system using the NTP protocol;
reduces clock synchronization error and ensures The calculation is accurate.
Setting an expansion plug-in for the authority model of each target system, and realizing the change of the authority model by utilizing the polling of the expansion plug-in;
The polling interval of the expansion plug-in is dynamically adjusted to ensure that the permission model of the target system can be mapped in time.
For example, in the SPD inventory turnover rate report generation process, after the authorities are unified, daily inventory snapshots are conveniently extracted from WMS (MySQL), and inventory records are extracted from a physical stream system (MongoDB);
Then, the average daily inventory is calculated (initial inventory + end inventory)/2, the monthly total inventory is aggregated according to the drug ID, the results are written into an inventory_turn table of the data warehouse (Snowflake), and a line graph is configured in Tableau to show the trend of each drug turnover rate.
In this embodiment, the polling interval of the extension plugin is dynamically adjusted to ensure that the permission model of the target system can be mapped in time, including the following steps:
recording the starting time of each model mapping, returning to the mapping completion time after the authority model completes the mapping, dynamically calculating a new polling interval through a sliding window weighted average delay and a proportional integral control algorithm, monitoring the delay, and replacing the current polling interval with the new polling interval to adjust the polling interval so as to ensure that the authority model of the target system can be mapped in time.
Wherein dynamically calculating a new polling interval by a sliding window weighted average delay and proportional-integral control algorithm comprises:
after each authority model mapping is completed, calculating time difference WhereinThe time at which the mapping is completed is indicated,Representing a start time of the model map;
Calculating a weighted average delay , wherein,Indicating the size of the sliding window and,Representing the weight coefficient; indicating the current time Previous firstTime delays for the respective time points;
Calculating control variables Wherein, the method comprises the steps of,Representing a target delay threshold value of the delay,The integral gain is represented as such,Representing the proportional gain; Represent the first Average delay of the individual sliding windows;
Calculating new polling intervals Wherein, the method comprises the steps of,Representing a current polling interval; Indicating the maximum value of the polling interval, A polling interval maximum;
If it is Replacing the current polling interval withAnd skips the calculation of the control variable.
For example, whenThe second of time is the time required for the device to complete,The second of time is the time required for the device to complete,The second of time is the time required for the device to complete,,。
If the delay of 3 successive switches is 15 seconds, 18 seconds and 20 seconds, the weighted average delaySecond, control variableSecond, wherein the second is; Second.
If the post-adjustment handoff delay is 8 seconds, thenThe second of time is the time required for the device to complete,Second.
Step two, constructing a plurality of real-time pipelines parallel to the ETL path to process the acquired data, writing analysis failure data into an error data area, generating warning information, writing the acquired data into a temporary storage area after removing the analysis failure data, forming a correct data set by the analysis success data, forming an error data set by the analysis failure data, and dividing the data in the correct data set into a plurality of sub-data sets by using a clustering algorithm according to the data structure type so as to know the quantity of each structure data.
Step two comprises the following substeps:
combining an open source stream processing platform Kafka with a stream data processing engine Flink to construct a real-time pipeline, wherein a plurality of real-time pipelines form a stream data processing system so as to process a plurality of data streams;
if the target system only supports batch file transmission, setting a batch file transmission plug-in the Kafka and pulling the data from a temporary storage area;
after the acquired data set is cleaned, data is acquired in real time through a sliding time window, so that a data stream is formed;
writing the data stream into a preset target storage area, locking, unlocking the target storage area after the Flink completes the detection of the state point, and outputting data;
Maintaining the original ETL path of the target system;
And constructing a gradual switching strategy, controlling the process of switching the ETL to the real-time pipeline, and gradually switching the ETL data processing task of the target system to the real-time pipeline. The method comprises the following steps:
And quantifying risk indexes in the switching process, wherein the risk indexes comprise data difference rates before and after switching, real-time pipeline processing delay time, flink detection failure rate and downstream report generation error rate.
In this embodiment, the difference rate of the output results of the original system and the real-time pipeline system is greater than 1%, and the risk is determined to be high when evaluating the consistency of the data, and if the flank event time (real-time pipeline processing delay time) is greater than 5 minutes, the risk is determined to be high when evaluating the timeliness of the system.
And writing data into the Kafka and the original ETL path at the same time, comparing the output of the ETL path with the output of the real-time pipeline, and stopping switching if the data difference rate of the two outputs is greater than a preset difference rate threshold.
And switching the uncore task data to a real-time pipeline, wherein the core task data is still transmitted through the original ETL path.
Detecting the transmission quantity of the core task data, if the transmission quantity of the core task data is lower than a preset transmission quantity threshold value in a preset time period, switching the core task data to a real-time pipeline, and keeping the original ETL pathThe time of a month is one month,。
When the real-time pipeline processing delay time exceeds a preset delay time threshold, cutting off the real-time pipeline, and automatically selecting an original ETL path to transmit data.
For example, if the real-time stream delay exceeds 10 minutes, the real-time pipeline is automatically shut off, and the switch is made to ETL until a reply is delayed. This may be accomplished by embedding fault tolerance library RESILIENCE j or service fuse demotion component Hystrix into the flank workbench.
The timeliness of medical SPD service can be improved by constructing a real-time pipeline through Kafka and Flink, and meanwhile, the risk of switching can be reduced by adopting a double-line transmission and progressive switching mode.
Writing the output of the real-time data processing system and/or the output of the ETL path into a data warehouse;
And step four, creating a visual dynamic report by utilizing the data in the data warehouse, analyzing and predicting the data of the dynamic report by a pre-trained machine learning model, and writing the prediction result into the dynamic report. Comprises the following substeps:
Acquiring data of different target systems from a data warehouse, and creating a dynamic report by using a BI tool Tableau;
Loading a machine learning model, inputting data of different target systems to generate corresponding prediction results, and writing the prediction results into a dynamic report, wherein the machine learning model comprises one or more of a time sequence prediction model LSTM, a classification model XGBoost and a linear regression model, and the method specifically comprises the following steps:
Acquiring data from a data warehouse, and identifying the data of different target systems through identification tags of the target systems;
The data of different target systems are subjected to characteristic value extraction and then standardized to form a target system characteristic set, for example, the standard deviation of the drug purchasing quantity and the stock quantity of the balance average in the past 7 can form a time sequence characteristic set, and the historical on-time delivery rate of a supplier can form a risk classification characteristic set of a purchasing supply system.
Elements in the target system feature set are used as input variables to be input into a machine learning model to output a prediction result, for example, the inventory demand in a certain time can be output through an LSTM model and can be used for generating an inventory demand report, the default probability of a provider can be output through a classification model, and default early warning data are added into the SPD report.
And writing the predicted result into a data warehouse, reading the predicted result from the data warehouse by using a BI tool, writing the predicted result into a dynamic report, and updating the dynamic report.
The traditional report forms depend on static rules and cannot adapt to dynamic business changes (such as medicine demand fluctuation caused by emergency), and the embodiment predicts inventory demands and supplier risks through an LSTM/XGBoost model, reduces dependence of manual experience and improves prediction accuracy.
Embodiment two:
The difference between this embodiment and the first embodiment is that in the first embodiment, defining the global policy by a range is an explicit authorization manner based on resources, and in this embodiment, an implicit authorization based on labels is given.
The method specifically comprises the following steps:
creating a business semantic tag, and associating the business semantic tag with resources of different target systems;
and identifying global role information, distributing different service semantic tags for different global roles, and accessing corresponding resources by the global roles through the corresponding service semantic tags.
In this embodiment, by setting the service semantic tag to "query service tag", the tag enable operation is set to "read" operation, the global role is "statistics" and the service semantic tag is associated with the system (HDFS, hive, kafka, etc.), the system resource can be accessed through the service tag.
Embodiment III:
The difference between the present embodiment and the first embodiment is that in the first embodiment, in the switching process of the ETL and the real-time pipeline, the switching control is performed by comparing the difference rate of the data output by the ETL and the real-time pipeline with the difference rate threshold. The difference rate threshold is fixed and needs to be manually set, and cannot adapt to the dynamically changing data processing process. Therefore, in this embodiment, a dynamic control switching strategy is provided, that is, by constructing a dynamic feedback system, the migration flow rate ratio is adaptively adjusted in real time according to the behavior difference between the ETL and the real-time pipeline system. The method comprises the following specific steps:
acquiring real-time pipeline output and ETL path output;
Calculating the relative entropy of real-time pipeline output and ETL path output:
, wherein, Representing the probability distribution of critical data in the real-time pipeline output,Representing probability distribution of the same key data in the ETL output; indicating Kullback-Leibler divergence.
The switching process is dynamically adjusted by a PID controller, comprising the sub-steps of:
The proportion of data flowing into the real-time data processing system is set as follows:
;
wherein, the ;The ratio gain two is indicated as being the ratio gain two,The second integral gain is indicated as being the integral gain,Representing differential gain; indicating the sensitivity of the PID controller to the current discrepancy, Indicating the correction capability of the PID controller for long term deviation,Indicating the judgment of the PID controller on the variation trend of the difference, and reducing when the difference suddenly increasesIs a value of (2).
Setting a sliding time window II, collecting output data of a real-time pipeline and an ETL path in real time, and calculatingAnd;
By means ofAndCalculating the change quantity of the proportion of the data flowing into the real-time pipeline after a sliding time window II, wherein,The length of the sliding time window II;
PPO dynamic optimization by reinforcement learning algorithm 、、Comprising the following substeps:
setting a state space ,
Wherein the integral termDifferential term;
Setting an action space;
Wherein, the adjustment amount of the proportional gain II;
Adjustment amount of integral gain two,
Differential gain adjustment;
Setting a bonus function;
Wherein, the The weight of the error is represented by the weight of the error,Indicating a penalty for the change in the control amount,Representing overshoot penalties;
overshoot amount ;Indicating at the current timeThe proportion of data flowing into the real-time data processing system at a previous point in time.
Constructing reinforcement learning algorithm by using Actor-Critic framework, and updating elements in state space by using elements in action space to realize the reinforcement learning algorithmIs updated according to the update of (a);
the updating of the elements in the state space by using the elements in the action space specifically comprises the following steps: ,, Wherein, the method comprises the steps of, 、、Representing the updated proportional gain two, integral gain two and differential gain.
For example, an implementation procedure of this embodiment may be:
Setting the initial flow ratio as Is provided with,,;
The PID controller is started every otherCalculate onceUpdated once;
If within a period of timeIf the fluctuation amplitude is larger than a preset fluctuation amplitude threshold value or the fluctuation frequency exceeds a preset fluctuation frequency threshold value, starting a reinforcement learning model to optimize、、;
When (when)And is also provided withAnd if the migration is judged to be completed after the time is longer than 24 hours, terminating the switching control task.
The scheme of the embodiment shortens the strategy effective delay through PID self-adaption, reduces average delay, can flexibly cope with burst flow or strategy change, optimizes the delay of global strategy effective, and improves the switching efficiency.
The invention also provides a processing system supporting the multi-source heterogeneous data, which comprises a server, a processor, a communication module, a data acquisition interface and a memory, wherein the communication module is in communication connection with the processor, the processor is connected with the server through the communication module, and the system is used for executing the processing method supporting the multi-source heterogeneous data.
The processes described above with reference to flowcharts may be implemented as computer software programs in accordance with the disclosed embodiments of the invention. Embodiments of the present disclosure include a computer program product comprising a computer program embodied on a computer readable medium, the computer program comprising program code for performing the method shown in the flow chart. In such embodiments, the computer program may be downloaded and installed from a network via a communication portion, and/or installed from a removable medium. The above-described functions defined in the method of the present invention are performed when the computer program is executed by a Central Processing Unit (CPU). The computer readable medium of the present invention may be a computer readable signal medium or a computer readable storage medium, or any combination of the two. The computer readable storage medium can be, for example, but not limited to, an electronic, magnetic, optical, electromagnetic, infrared, or semiconductor system, apparatus, or device, or a combination of any of the foregoing. More specific examples of a computer-readable storage medium may include, but are not limited to, an electrical connection having one or more wire segments, a portable computer diskette, a hard disk, a Random Access Memory (RAM), a read-only memory (ROM), an erasable programmable read-only memory (EPROM or flash memory), an optical fiber, a portable compact disc read-only memory (CD-ROM), an optical storage device, a magnetic storage device, or any suitable combination of the foregoing. In the context of this document, a computer readable storage medium may be any tangible medium that can contain, or store a program for use by or in connection with an instruction execution system, apparatus, or device. In the present invention, however, the computer-readable signal medium may include a data signal propagated in baseband or as part of a carrier wave, with the computer-readable program code embodied therein. Such a propagated data signal may take any of a variety of forms, including, but not limited to, electro-magnetic, optical, or any suitable combination of the foregoing. A computer readable signal medium may also be any computer readable medium that is not a computer readable storage medium and that can communicate, propagate, or transport a program for use by or in connection with an instruction execution system, apparatus, or device. Program code embodied on a computer readable medium may be transmitted using any appropriate medium, including but not limited to wireless segments, radio lines, fiber optic cables, RF, etc., or any suitable combination of the foregoing.
The flowcharts and block diagrams in the figures illustrate the architecture, functionality, and operation of possible implementations of systems, methods and computer program products according to various embodiments of the present invention. In this regard, each block in the flowchart or block diagrams may represent a module, segment, or portion of code, which comprises one or more executable instructions for implementing the specified logical function(s). It should also be noted that, in some alternative implementations, the functions noted in the block may occur out of the order noted in the figures. For example, two blocks shown in succession may, in fact, be executed substantially concurrently, or the blocks may sometimes be executed in the reverse order, depending upon the functionality involved. It will also be noted that each block of the block diagrams and/or flowchart illustration, and combinations of blocks in the block diagrams and/or flowchart illustration, can be implemented by special purpose hardware-based systems which perform the specified functions or acts, or combinations of special purpose hardware and computer instructions.
It will be understood by those skilled in the art that the embodiments of the present invention described above and shown in the drawings are merely illustrative and not restrictive of the current invention, and that this invention has been shown and described with respect to the functional and structural principles thereof, without departing from such principles, and that any changes or modifications may be made to the embodiments of the invention without departing from such principles.
Claims (9)
1. A method of processing supporting multi-source heterogeneous data, the method comprising:
Collecting data from different target systems, and adding an identification tag of a corresponding target system to each piece of data;
Constructing a plurality of real-time data processing systems parallel to the ETL path to process the acquired data, writing analysis failure data into an error data area, and generating warning information;
writing the output of the real-time data processing system and/or the output of the ETL path into a data warehouse;
Creating a visual dynamic report by utilizing data in a data warehouse, analyzing and predicting the dynamic report data by a pre-trained machine learning model, and writing a prediction result into the dynamic report;
The constructing a plurality of real-time data processing systems parallel to the ETL path processes the acquired data, including:
Combining an open source stream processing platform Kafka with a stream data processing engine Flink to construct a real-time data processing system, wherein a plurality of real-time data processing systems form the stream data processing system so as to process a plurality of data streams;
if the target system only supports batch file transmission, setting a batch file transmission plug-in the Kafka and pulling the data from a temporary storage area;
after the acquired data set is cleaned, data is acquired in real time through a sliding time window, so that a data stream is formed;
writing the data stream into a preset target storage area, locking, unlocking the target storage area after the Flink completes the detection of the state point, and outputting data;
Maintaining the original ETL path of the target system;
And constructing a progressive switching strategy, controlling the switching process of the ETL to the real-time data processing system, and gradually switching the ETL data processing task of the target system to the real-time data processing system.
2. The method for processing heterogeneous data according to claim 1, wherein the successfully parsed data form a correct data set and the failed parsed data form an error data set;
Before acquiring data from different target systems, defining global strategy and unifying authority of each target system;
The defining global policy includes:
registering the target system in a console of a security management framework Ranger, and importing a resource hierarchy of the target system, wherein the resource hierarchy comprises a table and a column group;
Selecting corresponding resources according to the resource levels of the target systems;
designating operations corresponding to the resources, including an allowing operation and a rejecting operation;
The unified target system authorities comprise:
mapping the resources of each target system into the unified identifier of Ranger;
Converting the native operation of each target system into a standard operation of Ranger;
Mapping the authority model of each target system into a unified resource-operation-condition model;
Creating a global role, and associating the global role with a corresponding model to bind the resources of a corresponding target system;
The conditions include IP range and effective duration of operation;
mapping the authority model of each target system into a unified resource-operation-condition model comprises the following steps:
synchronizing the clock of the range and the target system using the NTP protocol;
Setting an expansion plug-in for the authority model of each target system, and realizing the change of the authority model by utilizing the polling of the expansion plug-in;
The polling interval of the expansion plug-in is dynamically adjusted to ensure that the permission model of the target system can be mapped in time.
3. The method for processing multi-source heterogeneous data according to claim 2, wherein the dynamically adjusting the polling interval of the extension plug-in to ensure that the permission model of the target system can be mapped in time comprises:
Recording the starting time of each model mapping;
After the authority model finishes mapping, returning to the mapping completion time;
dynamically calculating a new polling interval through a sliding window weighted average delay and a proportional-integral control algorithm;
and monitoring delay, and replacing the current polling interval with a new polling interval to adjust the polling interval, so as to ensure that the permission model of the target system can be mapped in time.
4. A method of processing heterogeneous data according to claim 3, wherein dynamically calculating new polling intervals by a sliding window weighted average delay and proportional-integral control algorithm comprises:
after each authority model mapping is completed, calculating time difference WhereinThe time at which the mapping is completed is indicated,Representing a start time of the model map;
Calculating a weighted average delay , wherein,Indicating the size of the sliding window and,Representing the weight coefficient; indicating the current time Previous firstTime delays for the respective time points;
Calculating control variables Wherein, the method comprises the steps of,Representing a target delay threshold value of the delay,Which represents the integral gain one,Representing a proportional gain one; Represent the first Average delay of the individual sliding windows;
Calculating new polling intervals Wherein, the method comprises the steps of,Representing a current polling interval; Indicating the maximum value of the polling interval, A polling interval maximum;
If it is Replacing the current polling interval withAnd skips the calculation of the control variable.
5. The method for processing heterogeneous data according to claim 4, wherein defining the global policy further comprises:
creating a business semantic label;
associating the business semantic tags with resources of different target systems;
and identifying global role information, distributing different service semantic tags for different global roles, and accessing corresponding resources by the global roles through the corresponding service semantic tags.
6. The method for supporting heterogeneous multi-source data processing according to claim 5, wherein said constructing a gradual switching strategy to control a process of switching ETL to a real-time data processing system, gradually switching ETL data processing tasks of a target system to the real-time data processing system, comprises:
quantifying risk indexes in the switching process, wherein the risk indexes comprise data difference rates before and after switching, processing delay time of a real-time data processing system, flink detection failure rate and downstream report generation error rate;
writing data into the Kafka and the original ETL path at the same time, comparing the output of the ETL path with the output of the real-time data processing system, and stopping switching if the data difference rate of the two outputs is greater than a preset difference rate threshold;
Switching non-core task data to a real-time data processing system, wherein the core task data is still transmitted through an original ETL path;
Detecting the transmission quantity of the core task data, if the transmission quantity of the core task data is lower than a preset transmission quantity threshold value in a preset time period, switching the core task data to a real-time data processing system, and keeping the original ETL path The time of a month is one month,;
When the processing delay time of the real-time data processing system exceeds a preset delay time threshold, the real-time data processing system is cut off, and the original ETL path is automatically selected to transmit data.
7. The method for supporting multiple heterogeneous data according to claim 6, wherein said constructing a gradual switching strategy controls a process of switching ETL to a real-time data processing system, gradually switching ETL data processing tasks of a target system to the real-time data processing system, further comprising:
Acquiring real-time data processing system output and ETL path output;
Calculating the relative entropy of the real-time data processing system output and the ETL path output:
, wherein, Representing the probability distribution of critical data in the output of the real-time data processing system,Representing probability distribution of the same key data in the ETL output; represents the Kullback-Leibler divergence;
the switching process is dynamically adjusted by a PID controller, comprising the sub-steps of:
The proportion of data flowing into the real-time data processing system is set as follows: ;
wherein, the ;The ratio gain two is indicated as being the ratio gain two,The second integral gain is indicated as being the integral gain,Representing differential gain;
setting a sliding time window II, collecting output data of a real-time data processing system and an ETL path in real time, and calculating And;
By means ofAndCalculating to obtain relative entropy;
calculating the change of the proportion of the data flowing into the real-time data processing system after a sliding time window II , wherein,The length of the sliding time window II;
PPO dynamic optimization by reinforcement learning algorithm 、、Comprising the following substeps:
setting a state space Wherein the integral termDifferential term;
Setting an action space;
Wherein, the adjustment amount of the proportional gain II;
Adjustment amount of integral gain two,
Differential gain adjustment;
Setting a bonus function;
Wherein, the The weight of the error is represented by the weight of the error,Indicating a penalty for the change in the control amount,Representing overshoot penalties;
overshoot amount ,Indicating at the current timeThe proportion of data flowing into the real-time data processing system at a previous point in time;
Constructing reinforcement learning algorithm by using Actor-Critic framework, and updating elements in state space by using elements in action space to realize the reinforcement learning algorithm Is updated according to the update of (a);
the updating of the elements in the state space by using the elements in the action space specifically comprises the following steps: ,, Wherein, the method comprises the steps of, 、、Representing the updated proportional gain two, integral gain two and differential gain.
8. The method for processing heterogeneous data according to claim 7, wherein creating a visual dynamic report from data in the data warehouse, analyzing and predicting the dynamic report data by a pre-trained machine learning model, and writing the prediction result into the dynamic report, comprises:
Acquiring data of different target systems from a data warehouse, and creating a dynamic report by using a BI tool Tableau;
Loading a machine learning model, inputting data of different target systems to generate corresponding prediction results, and writing the prediction results into a dynamic report, wherein the machine learning model comprises one or more of a time sequence prediction model LSTM, a classification model XGBoost and a linear regression model, and the method specifically comprises the following steps:
Acquiring data from a data warehouse, and identifying the data of different target systems through identification tags of the target systems;
Extracting characteristic values of data of different target systems, and then carrying out standardization processing to form a target system characteristic set;
inputting elements in the target system feature set as input variables into a machine learning model, and outputting a prediction result;
and writing the predicted result into a data warehouse, reading the predicted result from the data warehouse by using a BI tool, writing the predicted result into a dynamic report, and updating the dynamic report.
9. A processing system supporting multi-source heterogeneous data, comprising a server, a processor, a plurality of data acquisition interfaces connected with the processor, a communication module and a memory, wherein the processor is connected with the server through the communication module, and the processing system is used for executing a processing method supporting multi-source heterogeneous data according to any one of claims 1-8.
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202510363080.5A CN119884229B (en) | 2025-03-26 | 2025-03-26 | A processing method and system supporting multi-source heterogeneous data |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202510363080.5A CN119884229B (en) | 2025-03-26 | 2025-03-26 | A processing method and system supporting multi-source heterogeneous data |
Publications (2)
Publication Number | Publication Date |
---|---|
CN119884229A CN119884229A (en) | 2025-04-25 |
CN119884229B true CN119884229B (en) | 2025-06-24 |
Family
ID=95421021
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202510363080.5A Active CN119884229B (en) | 2025-03-26 | 2025-03-26 | A processing method and system supporting multi-source heterogeneous data |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN119884229B (en) |
Citations (1)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN115001793A (en) * | 2022-05-27 | 2022-09-02 | 北京双湃智安科技有限公司 | Data fusion method for information security multi-source heterogeneous data |
Family Cites Families (6)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN111723160B (en) * | 2020-08-24 | 2021-03-23 | 国网浙江省电力有限公司 | Multi-source heterogeneous incremental data synchronization method and system |
US11847130B2 (en) * | 2021-05-21 | 2023-12-19 | Capital One Services, Llc | Extract, transform, load monitoring platform |
CN116166757B (en) * | 2022-12-06 | 2024-11-15 | 浪潮通用软件有限公司 | Multi-source heterogeneous lake and warehouse integrated data processing method, equipment and medium |
US20240386027A1 (en) * | 2023-05-19 | 2024-11-21 | Thermo Electron North America LLC | Flexible extract, transform, and load (etl) process |
CN117688024A (en) * | 2023-12-08 | 2024-03-12 | 国网山东省电力公司电力科学研究院 | Multi-source heterogeneous distribution electricity consumption data processing method, system, equipment and storage medium |
CN118093735A (en) * | 2024-03-04 | 2024-05-28 | 江苏中科西北星信息科技有限公司 | Method, system and medium for realizing multi-source heterogeneous data stream data warehouse |
-
2025
- 2025-03-26 CN CN202510363080.5A patent/CN119884229B/en active Active
Patent Citations (1)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN115001793A (en) * | 2022-05-27 | 2022-09-02 | 北京双湃智安科技有限公司 | Data fusion method for information security multi-source heterogeneous data |
Also Published As
Publication number | Publication date |
---|---|
CN119884229A (en) | 2025-04-25 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
US10372492B2 (en) | Job-processing systems and methods with inferred dependencies between jobs | |
CN105144112B (en) | Seasonal trend, forecast, anomaly detection and endpoint prediction for JAVA heap usage | |
CN114416703B (en) | Data integrity automatic monitoring method, device, equipment and medium | |
CN112559475B (en) | Data real-time capturing and transmitting method and system | |
CN106933205A (en) | Distributed industrial performance monitoring and analysis platform | |
US20160335124A1 (en) | Systems and Methods for Task Scheduling | |
US9690712B2 (en) | Method and system for cache management | |
US10366337B2 (en) | Computerized system for evaluating the likelihood of technology change incidents | |
KR102220712B1 (en) | Dynamic condition management apparatus and method to respond to real time in a factory | |
US10884798B2 (en) | Pipeline task verification for a data processing platform | |
US10387230B2 (en) | Technical language processor administration | |
US20240289190A1 (en) | Techniques for managing closed loop activation using time-series based event data | |
CN112559641A (en) | Processing method and device of pull chain table, readable storage medium and electronic equipment | |
US20220405235A1 (en) | System and method for reference dataset management | |
CN119940715A (en) | Intelligent accounting data management and compliance system and method | |
US10216798B2 (en) | Technical language processor | |
US20200320632A1 (en) | Method and system for time series data quality management | |
CN119884229B (en) | A processing method and system supporting multi-source heterogeneous data | |
CN112448840B (en) | Communication data quality monitoring method, device, server and storage medium | |
US11709480B2 (en) | System and method for automatic data classification for use with data collection system and process control system | |
US9773081B2 (en) | Analytic model lifecycle maintenance and invalidation policies | |
US9459939B2 (en) | In-memory approach to extend semantic event processing with domain insights | |
US12406198B2 (en) | Computer-implemented event forecasting and information provision | |
CN120086898B (en) | Machine authority management method and system based on customer independent decision-making mechanism | |
US11836153B1 (en) | Bidirectional high-volume data synchronization between intelligent platform and remote system with minimal latency |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
GR01 | Patent grant | ||
GR01 | Patent grant |