[go: up one dir, main page]

CN116795909B - Data processing method, device and computer-readable storage medium - Google Patents

Data processing method, device and computer-readable storage medium

Info

Publication number
CN116795909B
CN116795909B CN202211114115.4A CN202211114115A CN116795909B CN 116795909 B CN116795909 B CN 116795909B CN 202211114115 A CN202211114115 A CN 202211114115A CN 116795909 B CN116795909 B CN 116795909B
Authority
CN
China
Prior art keywords
data
performance data
result
incremental
summarizing
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Active
Application number
CN202211114115.4A
Other languages
Chinese (zh)
Other versions
CN116795909A (en
Inventor
张江梅
黄立群
陈闯
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
China Mobile Communications Group Co Ltd
China Mobile Suzhou Software Technology Co Ltd
Original Assignee
China Mobile Communications Group Co Ltd
China Mobile Suzhou Software Technology Co Ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by China Mobile Communications Group Co Ltd, China Mobile Suzhou Software Technology Co Ltd filed Critical China Mobile Communications Group Co Ltd
Priority to CN202211114115.4A priority Critical patent/CN116795909B/en
Publication of CN116795909A publication Critical patent/CN116795909A/en
Application granted granted Critical
Publication of CN116795909B publication Critical patent/CN116795909B/en
Active legal-status Critical Current
Anticipated expiration legal-status Critical

Links

Classifications

    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F16/00Information retrieval; Database structures therefor; File system structures therefor
    • G06F16/20Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
    • G06F16/25Integrating or interfacing systems involving database management systems
    • G06F16/254Extract, transform and load [ETL] procedures, e.g. ETL data flows in data warehouses
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F16/00Information retrieval; Database structures therefor; File system structures therefor
    • G06F16/20Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
    • G06F16/22Indexing; Data structures therefor; Storage structures
    • G06F16/2228Indexing structures
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F16/00Information retrieval; Database structures therefor; File system structures therefor
    • G06F16/20Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
    • G06F16/23Updating
    • G06F16/2308Concurrency control
    • G06F16/2315Optimistic concurrency control
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F16/00Information retrieval; Database structures therefor; File system structures therefor
    • G06F16/20Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
    • G06F16/27Replication, distribution or synchronisation of data between databases or within a distributed database system; Distributed database system architectures therefor

Landscapes

  • Engineering & Computer Science (AREA)
  • Theoretical Computer Science (AREA)
  • Databases & Information Systems (AREA)
  • Data Mining & Analysis (AREA)
  • Physics & Mathematics (AREA)
  • General Engineering & Computer Science (AREA)
  • General Physics & Mathematics (AREA)
  • Software Systems (AREA)
  • Computing Systems (AREA)
  • Information Retrieval, Db Structures And Fs Structures Therefor (AREA)

Abstract

The embodiment of the invention discloses a data processing method, a device and a computer readable storage medium, which comprise the steps of reading incremental performance data from a performance data queue through a stream processing program, wherein the performance data queue is used for recording network performance data acquired from at least one network element of a communication network, summarizing the incremental performance data by utilizing current resource data in a preset database to obtain an incremental summarizing result, wherein the preset database is used for recording resource data corresponding to at least one network element and the performance data summarizing result, and updating the preset database according to the incremental summarizing result. By the method and the device, the accuracy and the efficiency of data processing are improved.

Description

Data processing method, device and computer readable storage medium
Technical Field
The present invention relates to the field of communications, and in particular, to a data processing method, apparatus, and computer readable storage medium.
Background
The telecommunication industry can generate mass communication performance data every day, the original communication performance data collected by the network management system every day exceeds forty thousands of pieces, the collected original communication performance data is summarized and counted in real time and then fed back to network personnel, and the guarantee can be provided for safe and reliable operation of the communication network.
In the related technology, the time loss between the data acquisition, reporting and summarizing processes is reduced by introducing a message mechanism, but a communication report system based on the message mechanism is used for coordinating the message modules in series of the acquisition, summarizing and other modules, namely the robustness of the whole system greatly depends on the normal operation of the communication modules, and the overall robustness is poor along with the introduction of the message mechanism. The other preset timer for the reported data, the summary timer exceeds the previously reported data, the data which cannot be reported in time is processed in the next period, but the time delay between the completion of the data acquisition processing and the completion of the summary is reduced by setting the timeout time, the polling is needed to judge whether the data in the period is completely reported or not between the timeout of the timer, the resource consumption is high, and for the data reported in the timeout, a period of logic judgment and the processing of the data reported in the delay of a plurality of time periods are needed to be maintained.
Disclosure of Invention
The invention mainly provides a data processing method, a data processing device and a computer readable storage medium, which improve the accuracy and efficiency of data processing.
The technical scheme of the invention is realized as follows:
the embodiment of the invention provides a data processing method, which comprises the following steps:
Reading incremental performance data from a performance data queue by a stream processing program, wherein the performance data queue is used for recording network performance data acquired from at least one network element of a communication network;
Summarizing the incremental performance data by utilizing current resource data in a preset database to obtain an incremental summarizing result, wherein the preset database is used for recording the summarizing result of the resource data and the performance data corresponding to the at least one network element;
And updating the preset database according to the increment summarization result.
In the above scheme, the updating the preset database according to the increment summary result includes:
Generating an index corresponding to the increment summary result according to attribute information of the increment summary result stored in the preset update queue, wherein the attribute information comprises at least one of a network element name, a summary time dimension, a summary space dimension and a time node of summary data;
And matching the index with a preset index corresponding to the at least one network element, and updating the preset database based on a matching result.
In the above scheme, the matching result includes matching failure and matching success;
The updating the performance data based on the matching result includes:
If the matching fails, inserting the increment summarization result and the index into the preset database, and inserting the index into a preset index library corresponding to the preset database to finish updating the preset database, wherein the preset index library is used for providing index information for inquiring the preset database;
If the matching is successful, acquiring a target performance data summarizing result corresponding to a target preset index which is successfully matched;
And combining the target performance data summarizing result with the increment summarizing result to obtain a combined summarizing result, and updating the target performance data summarizing result by utilizing the combined summarizing result to finish updating the preset database.
In the above scheme, the method further includes, before summarizing the incremental performance data by using the current resource data in the preset database to obtain an incremental summarizing result:
acquiring original resource data corresponding to the at least one network element from the preset database through a stream processing program;
and extracting the data of the original resource data to obtain a resource data list, wherein the resource data list comprises current resource data corresponding to each network element.
In the above scheme, the summarizing the incremental performance data by using the current resource data in the preset database to obtain an incremental summarizing result includes:
And according to each item of current resource data in the resource data list, performing at least one numerical integration and summarization treatment on the incremental performance data to obtain the incremental summarization result.
In the above solution, before the incremental performance data is read from the performance data queue by the stream processing program, the method further includes:
collecting network performance data corresponding to at least one network element;
And carrying out standardization processing on the network performance data, and recording the network performance data in the performance data queue.
In the above solution, the performance data queue is a message queue, and the reading, by the flow processing program, incremental performance data from the performance data queue includes:
acquiring recorded historical reading data positions through the stream processing program;
and determining an incremental data part from the performance data queue based on the historical reading position and reading to obtain the incremental performance data.
In the above scheme, the incremental summary result is an incremental performance data summary result corresponding to a first preset time period, and the method further includes:
Summarizing the incremental performance data according to a second preset time period to obtain an initial incremental summarizing result, wherein the second preset time period is smaller than the first preset time period;
And under the condition that one initial increment summarization result is obtained, summarizing at least one initial increment summarization result obtained in the first preset time period, and obtaining the increment summarization result.
The embodiment of the invention provides a data processing device which comprises a reading unit, a summarizing unit and an updating unit, wherein,
The reading unit is used for reading incremental performance data from a performance data queue through a stream processing program, wherein the performance data queue is used for recording network performance data acquired from at least one network element of a communication network;
The summarizing unit is used for summarizing the incremental performance data by utilizing the current resource data in a preset database to obtain an incremental summarizing result, wherein the preset database is used for recording the resource data and performance data summarizing result corresponding to the at least one network element;
And the updating unit is used for updating the preset database according to the increment summarization result.
An embodiment of the present invention provides a data processing apparatus, including:
A memory for storing executable instructions;
And a processor for executing the executable instructions stored in the memory, the processor executing the data processing method when the executable instructions are executed.
Embodiments of the present invention provide a storage medium storing executable instructions that, when executed, are configured to cause a processor to perform a data processing method according to embodiments of the present invention.
The embodiment of the invention provides a data processing method, a device and a computer readable storage medium, wherein the method comprises the steps of reading incremental performance data from a performance data queue through a stream processing program, wherein the performance data queue is used for recording network performance data acquired from at least one network element of a communication network, summarizing the incremental performance data by utilizing current resource data in a preset database to obtain an incremental summarizing result, and the preset database is used for recording the resource data and the performance data summarizing result corresponding to the at least one network element and updating the preset database according to the incremental summarizing result. In the scheme, the incremental data uploaded to the file system in batches are summarized by adopting the message queue and the stream processing program, and the performance data in the database is updated according to the summarized result and the corresponding index file, so that the real-time statistical result of the performance data reported in the current day can be observed in each dimension, the risk of data precision loss caused by data delay reporting is avoided, and the accuracy and the efficiency of data processing are improved.
Drawings
Fig. 1 is a schematic flow chart of a data processing method according to an embodiment of the present invention;
FIG. 2 is a flowchart of a scheme framework of a near real-time communication report data system according to an embodiment of the present invention;
FIG. 3 is a flow chart of delta summary provided by an embodiment of the present invention;
FIG. 4 is a flowchart of updating an index cluster according to an embodiment of the present invention;
FIG. 5 is a flow chart of incremental data extraction according to an embodiment of the present invention;
FIG. 6 is a data flow diagram of a performance data summary provided by an embodiment of the present invention;
FIG. 7 is a schematic diagram of a data processing apparatus according to an embodiment of the present invention;
fig. 8 is a schematic structural diagram of another data processing apparatus according to an embodiment of the present invention.
Detailed Description
The following description of the embodiments of the present invention will be made clearly and completely with reference to the accompanying drawings, in which it is apparent that the embodiments described are only some embodiments of the present invention, but not all embodiments. All other embodiments obtained by those skilled in the art based on the embodiments of the present invention without making any inventive effort are within the scope of the present invention.
In order to better understand the aspects of the present invention, the present invention will be described in further detail with reference to the accompanying drawings and detailed description. Fig. 1 is a schematic flow chart of a data processing method according to an embodiment of the present invention, which will be specifically described with reference to the following steps.
And S101, reading incremental performance data from a performance data queue through a stream processing program, wherein the performance data queue is used for recording network performance data acquired from at least one network element of the communication network.
In an embodiment of the invention, the data processing device reads incremental performance data from a performance data queue in a stream processing mode, wherein the performance data queue is used for recording network performance data acquired from at least one network element in a communication network.
In the embodiment of the invention, in the streaming data processing mode, data continuously arrives, the system processes the newly arrived data in time and continuously generates output, the processed data is generally discarded, and the processed data can be saved, so that the speed of data processing is emphasized in the streaming data processing mode. In part, because data is generated at a very fast rate and needs to be processed in a timely manner. Because the stream data processing system can process the newly arrived data in time, the stream data processing system can provide the latest trend of things development change for a decision maker so as to respond to an emergency in time and adjust countermeasures. For batch processing, data is first collected continuously, saved to a database, and then analyzed (including SQL queries). Batch processing is suitable for processing large amounts of data (High Volume). One needs to wait until the entire analysis processing task is completed before obtaining the final result. Due to the difference in the size of the data sets that need to be processed and the computing power of the computer system, the whole process sometimes takes a considerable amount of time, i.e. the delay in obtaining the final analysis processing results is large. Batch processing is the most common data processing mode, and traditional relational database systems, hadoop and Spark big data processing platforms and the like all adopt or mainly adopt the data processing mode. Since the entire data set needs to be stored completely and analyzed thereon, more hardware resources are required than in streaming data processing systems.
In embodiments of the present invention, the queue is a special linear table, and the data elements of the queue are also referred to as queue elements. Inserting a queue element into a queue is referred to as enqueuing and removing a queue element from the queue is referred to as dequeuing. Because a queue is only allowed to be inserted at one end and deleted at the other end, only the element that first enters the queue can be deleted from the queue first, so the queue is also known as a first-in-first-out linear table.
In an embodiment of the present invention, the performance data is current delta data of the current period relative to the previous period. The network element consists of one or more machine discs or machine frames, and can independently complete certain transmission functions.
S102, summarizing the incremental performance data by utilizing current resource data in a preset database to obtain an incremental summarizing result, wherein the preset database is used for recording the summarizing result of the resource data and the performance data corresponding to the at least one network element.
In the embodiment of the invention, the data processing device performs summarization processing on the incremental performance data according to the current resource data stored in the preset database to obtain an incremental summarization result, and then stores the incremental summarization result into the preset database.
In the embodiment of the present invention, the current resource data may be the resource data collected on the same day, which is not limited by the present invention.
In an embodiment of the invention, the summarization process comprises at least one of summing, averaging, maximizing and minimizing the data.
In an embodiment of the present invention, the communication performance data is incremental performance communication data generated in real time as compared to the full amount of resource data.
S103, updating a preset database according to the increment summarization result.
In the embodiment of the invention, the data processing device performs data merging on the preset database according to the increment summarization result.
In the embodiment of the invention, as shown in fig. 2, fig. 2 is a flowchart of a scheme framework of a near real-time communication report data system provided by the embodiment of the invention. In FIG. 2, a data acquisition module acquires resource data and stores the resource data in a database, a total resource data extraction algorithm is utilized to periodically store the resource data in the database in a Hadoop distributed file system, the data acquisition module stores the acquired performance data in a kafka message queue, a flink incremental summarization algorithm is utilized to summarize the resource data and the incremental performance data in the kafka message queue to obtain summarized results, solrCloud is utilized to establish an index for the incremental summarized results, and finally the database is updated by utilizing the summarized results after the index establishment. The data calculation module comprises a performance data increment processing module and a resource data total processing module, wherein the performance data increment processing module adopts stream processing to pull and collect performance data pushed into the kafka message queue in real time, and in the collecting process, performance data indexes are required to be integrated according to resource data information, and the resource data information is stable and has small daily fluctuation. The resource data total processing module can extract the resource data in the database into the Hadoop distributed file system through a total data extraction algorithm for flink summarization. solrCloud in the data processing device solves the problem that a plurality of increment summary results obtained by increment data summary are inserted into a database, and specifically, a unique index is established for the increment summary results of each space dimension and time period of each network element, index conflict inquiry is carried out before the increment summary results are inserted into the database, insertion operation is carried out on the results without conflict, indexes are established, and updating operation is carried out on the summary results with conflict. The data calculation module in the data processing device comprises a performance data increment processing module and a resource data total processing module, and the data calculation module completes summarization statistics of the original data according to the performance data increment processing module and the resource data total processing module and provides a data source for the whole report. And solrCloud in the data processing device correlates the established index with the increment summarization result in the database, and completes the management work of the data generated by the data calculation module through the creation, modification and deletion work of the index. The data acquisition module in the data processing device performs maintenance work on the source data.
It can be understood that in the embodiment of the invention, the incremental data uploaded to the file system in batches is summarized by adopting the message queue and the stream processing program, and the performance data in the database is updated according to the summarized result and the corresponding index file, so that the real-time statistical result of the performance data reported in the day can be observed in each dimension, the risk of data precision loss caused by data delay reporting is avoided, and the accuracy and efficiency of data processing are improved.
In the embodiment of the present invention, S103 can be realized by S1031 to S1032, which will be specifically described in connection with the following steps.
S1031, generating an index corresponding to the increment summary result according to attribute information of the increment summary result stored in a preset update queue, wherein the attribute information comprises at least one of a network element name, a summary time dimension, a summary space dimension and a time node of summary data.
In the embodiment of the invention, the data processing device acquires the data waiting to be updated from the preset update queue, and generates an index corresponding to the incremental summary result according to at least one of the network element name of the incremental summary data, the summarized time space dimension and the time node.
In the embodiment of the invention, the time dimension can be 15 minutes granularity, hour granularity, day granularity, month granularity, and the space dimension can be a large area, a data center, a resource pool, a network element, a province, a country and the like.
In some embodiments of the present invention, a solrCloud module in the data processing apparatus associates information such as network elements, time dimensions, space dimensions, time nodes of summarized data, etc. of the data with index names according to a preset rule, and the association relationship between the data report and the index established by the solrCloud module is that each time period data of each table corresponds to one index. The solrCloud module creates an index creation rule that the sole index is composed of separate or different files. Each file is composed of different fields, with each field being a single attribute. The association relation between the data report and the solr index is that each table corresponds to one solr index, and each row of data corresponds to one file under the index. Each file is composed of a plurality of fields, and each field corresponds to each piece of data information, including a corresponding network element, a time dimension, a space dimension, and a time node of summarized data.
S1032, matching the index with a preset index corresponding to at least one network element, and updating the preset database based on the matching result.
In the embodiment of the invention, the data processing device matches the established index with the preset index corresponding to at least one network element, and updates the preset database by using the increment summarization result according to the matching result.
In some embodiments of the present invention, as shown in fig. 3, fig. 3 is a flowchart of incremental summarizing provided by an embodiment of the present invention, and is implemented by Sa to Sf. In FIG. 3, the incremental data summarization task begins, flink reads the full data file from the Hadoop distributed file system and generates a table, reads the performance data from the kafka message queue and processes the table, performs summarization processing on the performance data using the resource data information in the full data file, writes the summarized data into the database, and the incremental data summarization task ends.
In some embodiments of the present invention, as shown in fig. 4, fig. 4 is a flowchart of updating an index cluster according to an embodiment of the present invention, which is implemented from SA to SH. In fig. 4, a processing program is started, a summary result is inserted into an update queue, data waiting for the update queue is acquired, a corresponding index is generated from a sol cluster (solrCloud) according to the network element name of the summary data and information such as the summarized space time dimension, time node and the like, whether index conflict exists or not is judged, if the index conflict exists, the database update operation is executed, if the index conflict does not exist, the summary result and the corresponding index are inserted into a database, and the task is ended.
In some embodiments of the present invention, S1033 may be implemented through S201 to S203, which will be specifically described in connection with the following steps.
S201, if the matching fails, inserting the increment summary result and the index into a preset database, inserting the index into a preset index database corresponding to the preset database, and finishing updating the preset database, wherein the preset index database is used for providing index information for inquiring the preset database.
In an embodiment of the present invention, the matching result includes a matching failure and a matching success. When matching fails, namely the index inquiry finds that the index corresponding to the increment summary result does not exist in the preset database, the data processing device inserts the increment summary result and the generated index into the preset database, and inserts the generated database into the preset index database corresponding to the preset database at the same time, so that the update of the preset database and the preset index database is completed.
S202, if the matching is successful, acquiring a target performance data summarizing result corresponding to a target preset index which is successfully matched.
In the embodiment of the invention, if the matching is successful, namely, the index inquiry finds that the index of the increment summary result corresponds to the same index as a certain data in the preset database, the data processing device acquires the target performance data summary result corresponding to the target preset index which is successfully matched in the preset database.
S203, combining the target performance data summarization result and the increment summarization result to obtain a combined summarization result, and updating the target performance data summarization result by utilizing the combined summarization result to finish updating the preset database.
In the embodiment of the invention, after the data processing device acquires the target performance summary result, the data processing device performs data combination on the target performance summary result and the increment summary result to obtain a combined summary result, and updates the target performance data summary result by using the combined summary result to finish updating the preset database.
In the embodiment of the present invention, S301 to S302 are further included before S102, and will be specifically described by the following steps.
S301, obtaining original resource data corresponding to at least one network element from a preset database through a stream processing program.
In the embodiment of the invention, the data processing device acquires the acquired original resource data corresponding to at least one network element from a preset database through a stream processing program.
S302, extracting data from the original resource data to obtain a resource data list, wherein the resource data list comprises current resource data corresponding to each network element.
In the embodiment of the invention, the data processing device performs data extraction on the original resource data to obtain a resource data list containing the current resource data corresponding to each network element.
In some embodiments of the present invention, the data processing apparatus may acquire the resource data list from the preset database periodically every day.
In some embodiments of the present invention, as shown in fig. 5, fig. 5 is a flowchart of incremental data extraction provided in an embodiment of the present invention, and implemented through Sg to Sk. In FIG. 5, the task is timed to begin, the required full amount of resource data is read from the database, the full amount of resource data is written to the local data file, the file is imported into the Hadoop distributed file system, and the task is ended.
In some embodiments of the present invention, S102 may be implemented by S1021, which will be specifically described in connection with the following steps.
S1021, performing at least one numerical integration and summarization treatment on the incremental performance data according to each item of current resource data in the resource data list to obtain an incremental summarization result.
In some embodiments of the present invention, the data processing apparatus determines corresponding incremental performance data according to each item of current resource data in the resource data list, and performs at least one data integration and summarization process on the corresponding incremental performance data to obtain an incremental summarization result.
In some embodiments of the present invention, at least one of the numerical integration and summary processes performs data summation, averaging, maximum and minimum, and the like.
In some embodiments of the present invention, S401 to S402 are further included before S101, and the following steps will be specifically described.
S401, collecting network performance data corresponding to at least one network element.
In some embodiments of the present invention, the data processing device collects network performance data corresponding to at least one network element.
In some embodiments of the present invention, the data processing apparatus may collect network performance data corresponding to at least one network element from the operation and maintenance center.
S402, the network performance data is normalized and recorded in a performance data queue.
In some embodiments of the present invention, the data processing device performs normalization processing on the collected network performance data, so as to obtain normalized performance data, and records the normalized performance data in a performance data queue.
In some embodiments of the present invention, S101 may be implemented through S1011 to S1012, which will be specifically described in connection with the following steps.
S1011, acquiring the recorded historical reading data position through a stream processing program.
In some embodiments of the invention, the data processing apparatus obtains a recorded historical read data location.
In some embodiments of the present invention, the kafka message queue creates a log file for each partition, the file storing data in the partition, the data processing device periodically appends the acquired performance data to the end of the log file, each piece of data has its own offset, and the flink delta summary algorithm records in real time which part of the performance data to be summarized.
S1012, determining and reading an incremental data part from the performance data queue based on the historical reading position to obtain incremental performance data.
In some embodiments of the invention, the data processing apparatus determines and reads incremental data portions from the performance data queue based on the historical read performance data locations to obtain incremental performance data.
In the embodiment of the present invention, when the incremental summary result is an incremental performance data summary result corresponding to the first preset time period, the data processing method further includes S501 to S502, which will be specifically described in connection with the following steps.
And S501, summarizing the incremental performance data according to a second preset time period to obtain an initial incremental summarizing result, wherein the second preset time period is smaller than the first preset time period.
In some embodiments of the present invention, after obtaining an incremental performance data summary result corresponding to a first preset time period, the data processing apparatus performs summary processing on incremental performance data in a second preset time period, to obtain an initial incremental summary result, where the second preset time period is smaller than the first preset time period.
S502, under the condition that one initial increment summarization result is obtained, summarizing is conducted on the basis of at least one initial increment summarization result obtained in a first preset time period, and increment summarization results are obtained.
In some embodiments of the present invention, the second preset time period is less than the first preset time period, i.e., the second preset time period is of a different time dimension than the first preset time period. The data processing apparatus may aggregate the incremental performance data in at least one time dimension based on the time attribute information in the incremental performance data. And when the initial increment summary result corresponding to the new second preset time period is obtained in the first preset time period, continuing to carry out summary processing based on at least one initial increment summary result currently obtained in the first preset time period until the summary of the initial increment summary results corresponding to all the second preset time periods in the first preset time period is completed.
It should be noted that, in the embodiment of the present invention, the data processing apparatus may further perform at least one spatial dimension summary on the incremental performance data according to the spatial attribute information in the incremental performance data. That is, the data processing apparatus may perform the aggregation processing on the incremental performance data in at least one of the time dimension and the space dimension to obtain the incremental aggregation result.
In some embodiments of the present invention, as shown in fig. 6, fig. 6 is a data flow chart of a performance data summary provided in an embodiment of the present invention. Fig. 6 shows a method of summarizing data processing apparatuses in 4 time dimensions and 6 space dimensions, wherein the 4 time dimensions include, from low to high, 15 minutes granularity (denoted by q in fig. 6), hours granularity (denoted by h in fig. 6), days granularity (denoted by d in fig. 6), months granularity (denoted by m in fig. 6), and the 6 space dimensions include a large area (space), a data center (DATA CENTER, dc), a resource pool/virtualization infrastructure manager (virtualized infrastructure manager, vim), a network element (NET ELEMENT, ne), province (province), and a country (country). It can be seen that, taking the 15-minute granularity example in fig. 6, the data processing apparatus performs the aggregation of 6 space dimensions in the 15-minute time dimension, and can obtain space_q (15-minute large area range aggregation result), dc_q (15-minute data center range aggregation result), vim_q (15-minute resource pool range aggregation result), ne_q (15-minute network element range aggregation result), precursor_q (15-minute province range aggregation result), and country_q (15-minute country range aggregation result), other time dimensions, and so on.
In the related art, after completing the summary of all 15 minutes of granularity within one hour, the summary of granularity within one hour is performed, the summary task of granularity within one day is completed, and then the summary of granularity within one day is performed, and so on. It can be seen that the summary method of the related art has a relatively large time delay, and cannot meet the requirement of real-time query in a high time dimension. In the embodiment of the invention, when a low-dimensional summary result, such as a summary result of 15 minutes, is generated, the low-dimensional summary result can be stored in a kafka message queue, the low-dimensional summary result is obtained from the kafka message queue through a stream processing program, and high-dimensional data summary is started in real time to obtain the high-latitude data summary result. Here, the low-dimensional summary result may be an initial increment summary result corresponding to the second preset time period, and the high-dimensional summary result may be a current increment summary result corresponding to the first preset time period.
Illustratively, when the first 15-minute network element range summary result ne_q is generated, the data processing apparatus stores the 15-minute network element range summary result in the kafka message queue, and at the same time starts the summary flow of the next higher time dimension, that is, the hour dimension. The data processing device acquires the ne_q from the kafka message queue through a stream processing program, and when a new ne_q appears in the hour, the new ne_q and the acquired ne_q in the hour are subjected to time dimension summarization to obtain a current increment summarization result ne_h corresponding to the hour. Meanwhile, the summary of the space dimensions can be started on the basis of the ne_q in the space range, such as the resource pool range, and the summary result of the space range, such as the precursor_q, corresponding to 15 minutes is obtained.
According to the method, the data summarization of the resource pool dimension, the provincial dimension and the network element dimension is started in real time, so that the real-time statistical result of the performance data reported in the daytime can be observed in each dimension in real time, the risk of data precision loss caused by data delay reporting is avoided, and the accuracy and the efficiency of data processing are improved.
The embodiment of the invention provides a data processing method, which is specifically described by combining the following steps.
S1, respectively acquiring resource data and performance data by a data processing device, and storing the resource data and the performance data into a preset database and a preset message queue.
S2, the data processing device extracts resource data from a preset database by using a preset algorithm and stores the resource data into a preset distributed system.
In the embodiment of the present invention, S2 may be implemented by S2.1 to S2.4, which will be specifically described in conjunction with the following.
S2.1, setting a timing task by the data processing device, and executing at fixed time.
S2.2, the data processing device is connected with a preset database, and the latest resource data are read.
S2.3, the data processing device writes the resource data information extracted from the preset database into the file.
S2.4, the data processing device uploads the file stored with the resource data to the Hadoop distributed file system.
And S3, the data processing device utilizes a preset summarizing algorithm to summarize the collected performance data and the resource data, and a summarizing result is obtained.
In some embodiments of the present invention, S3 may be implemented by S3.1 to S3.4, which will be specifically described in connection with the following steps.
S3.1, the data processing device starts a stream processing program.
And S3.2, the data processing device reads the full resource data information from the Hadoop distributed file system and registers the full resource data information into a table.
And S3.3, the data processing device reads the performance data from a preset message queue.
And S3.4, the data processing device utilizes the resource data to summarize the performance data, and a summarizing result is obtained.
S4, the data processing device establishes an index for the summarized result.
In the embodiment of the present invention, S4 may be implemented by S4.1 to S4.4, and will be specifically described in connection with the following steps.
And S4.1, after the data processing device acquires the summarized result, starting an index establishment program.
And S4.2, the data processing device inserts the summarized result into an update queue.
And S4.3, the data processing device acquires the data waiting for updating.
And S4.4, the data processing device generates a corresponding index ID from the preset cluster according to the network element names of the summarized data, the summarized space time dimension, the summarized time node and other information.
S5, the data processing device judges whether index conflict exists, if yes, S6 is executed, and if not, S7 is executed.
S6, the data processing device extracts the original index file and performs data combination in the database.
And S7, the data processing device inserts the summarized results and indexes corresponding to the summarized results into the database, and inserts the indexes corresponding to the summarized results into the index database.
In the embodiment of the invention, the data processing device updates the report in the preset database by utilizing the summarized result after the index is established, and stores the report in the preset index file according to the index format. When index conflict is encountered, the original index file is extracted, and data combination, namely data updating operation is carried out.
It can be understood that in the embodiment of the invention, the incremental data uploaded to the file system in batches is summarized by adopting the message queue and the stream processing program, and the performance data in the database is updated according to the summarized result and the corresponding index file, so that the real-time statistical result of the performance data reported in the day can be observed in each dimension, the risk of data precision loss caused by data delay reporting is avoided, and the accuracy and efficiency of data processing are improved.
An embodiment of the present invention provides a data processing apparatus, as shown in fig. 7, fig. 7 is a schematic structural diagram of the data processing apparatus provided in the embodiment of the present invention, where the data processing apparatus 7 includes a reading unit 701, a summarizing unit 702 and an updating unit 703,
The reading unit 701 is configured to read incremental performance data from a performance data queue through a stream processing program, where the performance data queue is configured to record network performance data collected from at least one network element of a communication network;
The summarizing unit 702 is configured to summarize the incremental performance data by using current resource data in a preset database to obtain an incremental summarizing result, where the preset database is used to record the summarizing result of the resource data and the performance data corresponding to the at least one network element;
the updating unit 703 is configured to update the preset database according to the delta summary result.
In some embodiments of the present invention, the updating unit 703 is further configured to generate an index corresponding to the incremental summary result according to attribute information of the incremental summary result stored in the preset update queue, where the attribute information includes at least one of a network element name, a summary time dimension, a summary space dimension, and a time node of summary data, match the index with a preset index corresponding to the at least one network element, and update the preset database based on a matching result.
In some embodiments of the present invention, the matching result includes a matching failure and a matching success, the updating unit 703 is further configured to insert the increment summary result and the index into the preset database and insert the index into a preset index library corresponding to the preset database to complete updating the preset database, the preset index library is configured to provide index information for querying the preset database, and obtain a target performance data summary result corresponding to a target preset index that is successfully matched if the matching is successful, and combine the target performance data summary result and the increment summary result to obtain a combined summary result, and update the target performance data summary result by using the combined summary result to complete updating the preset database.
In some embodiments of the present invention, the reading unit 701 is further configured to obtain, by using a stream processing program, original resource data corresponding to the at least one network element from a preset database before performing summary processing on the incremental performance data by using current resource data in the preset database to obtain an incremental summary result, and perform data extraction on the original resource data to obtain a resource data list, where the resource data list includes current resource data corresponding to each network element.
In some embodiments of the present invention, the summarizing unit 702 is further configured to perform at least one numerical integration and summarizing process on the incremental performance data according to each current resource data in the resource data list, so as to obtain the incremental summarizing result.
In some embodiments of the present invention, the data processing apparatus further includes a recording unit 704, where the recording unit 704 is configured to collect network performance data corresponding to at least one network element, and normalize the network performance data and record the normalized network performance data in the performance data queue.
In some embodiments of the present invention, the reading unit 701 is further configured to obtain a recorded historical reading data location through the stream processing program, and determine an incremental data portion from the performance data queue based on the historical reading location and read the incremental performance data portion, so as to obtain the incremental performance data.
In some embodiments of the present invention, the incremental summary result is an incremental performance data summary result corresponding to a first preset time period, the summary unit 702 is further configured to perform summary processing on the incremental performance data according to a second preset time period, to obtain an initial incremental summary result, where the second preset time period is smaller than the first preset time period, and perform summary processing based on at least one initial incremental summary result obtained in the first preset time period to obtain the incremental summary result in case that one initial incremental summary result is obtained.
It can be understood that in the implementation scheme of the device, the message queue and the stream processing program are adopted to collect incremental data uploaded to the file system in batches, and update the performance data in the database according to the collected results and the corresponding index files, so that the real-time statistical results of the performance data reported in the daytime can be observed in each dimension, the risk of data precision loss caused by data delay reporting is avoided, and the accuracy and efficiency of data processing are improved.
Based on the method of the foregoing embodiment, as shown in fig. 8, the schematic structural diagram of a data processing apparatus according to the embodiment of the present invention includes a processor 801 and a memory 802, where the memory 802 stores one or more programs executable by the processor 801, and when the one or more programs are executed, the processor 801 executes a data processing method according to the foregoing embodiment.
The embodiment of the invention provides a computer readable storage medium which stores executable instructions for realizing the data processing method when being executed by a processor.
It will be appreciated by those skilled in the art that embodiments of the present invention may be provided as a method, system, or computer program product. Accordingly, the present invention may take the form of a hardware embodiment, a software embodiment, or an embodiment combining software and hardware aspects. Furthermore, the present invention may take the form of a computer program product embodied on one or more computer-usable storage media (including, but not limited to, magnetic disk storage, optical storage, and the like) having computer-usable program code embodied therein.
The present invention is described with reference to flowchart illustrations and/or block diagrams of methods, apparatus (systems) and computer program products according to embodiments of the invention. It will be understood that each flow and/or block of the flowchart illustrations and/or block diagrams, and combinations of flows and/or blocks in the flowchart illustrations and/or block diagrams, can be implemented by computer program instructions. These computer program instructions may be provided to a processor of a general purpose computer, special purpose computer, embedded processor, or other programmable data processing apparatus to produce a machine, such that the instructions, which execute via the processor of the computer or other programmable data processing apparatus, create means for implementing the functions specified in the flowchart flow or flows and/or block diagram block or blocks.
These computer program instructions may also be stored in a computer-readable memory that can direct a computer or other programmable data processing apparatus to function in a particular manner, such that the instructions stored in the computer-readable memory produce an article of manufacture including instruction means which implement the function specified in the flowchart flow or flows and/or block diagram block or blocks.
These computer program instructions may also be loaded onto a computer or other programmable data processing apparatus to cause a series of operational steps to be performed on the computer or other programmable apparatus to produce a computer implemented process such that the instructions which execute on the computer or other programmable apparatus provide steps for implementing the functions specified in the flowchart flow or flows and/or block diagram block or blocks.
The foregoing description is only of the preferred embodiments of the present invention, and is not intended to limit the scope of the present invention.

Claims (10)

1.A method of data processing, comprising:
Reading incremental performance data from a performance data queue by a stream processing program, wherein the performance data queue is used for recording network performance data acquired from at least one network element of a communication network;
Summarizing the incremental performance data by utilizing current resource data in a preset database to obtain an incremental summarizing result, wherein the preset database is used for recording the summarizing result of the resource data and the performance data corresponding to the at least one network element;
Updating the preset database according to the increment summarization result;
wherein, the updating the preset database according to the increment summarization result comprises:
Generating an index corresponding to the increment summary result according to attribute information of the increment summary result stored in a preset update queue, wherein the attribute information comprises at least one of a network element name, a summary time dimension, a summary space dimension and a time node of summary data;
And matching the index with a preset index corresponding to the at least one network element, and updating the preset database based on a matching result.
2. The method of claim 1, wherein the matching result includes a matching failure and a matching success;
Based on the matching result, updating the preset database comprises the following steps:
If the matching fails, inserting the increment summarization result and the index into the preset database, and inserting the index into a preset index library corresponding to the preset database to finish updating the preset database, wherein the preset index library is used for providing index information for inquiring the preset database;
If the matching is successful, acquiring a target performance data summarizing result corresponding to a target preset index which is successfully matched;
And combining the target performance data summarizing result with the increment summarizing result to obtain a combined summarizing result, and updating the target performance data summarizing result by utilizing the combined summarizing result to finish updating the preset database.
3. The method according to any one of claims 1-2, wherein, before summarizing the incremental performance data using current resource data in a preset database to obtain an incremental summarized result, the method further comprises:
acquiring original resource data corresponding to the at least one network element from the preset database through a stream processing program;
and extracting the data of the original resource data to obtain a resource data list, wherein the resource data list comprises current resource data corresponding to each network element.
4. The method of claim 3, wherein summarizing the incremental performance data using current resource data in a preset database to obtain an incremental summary result, comprising:
And according to each item of current resource data in the resource data list, performing at least one numerical integration and summarization treatment on the incremental performance data to obtain the incremental summarization result.
5. The method of claim 1, wherein prior to reading incremental performance data from the performance data queue by the stream processing program, the method further comprises:
collecting network performance data corresponding to at least one network element;
And carrying out standardization processing on the network performance data, and recording the network performance data in the performance data queue.
6. The method of claim 1, wherein the performance data queue is a message queue, and wherein the reading incremental performance data from the performance data queue by the stream processing program comprises:
acquiring recorded historical reading data positions through the stream processing program;
and determining an incremental data part from the performance data queue based on the historical reading position and reading to obtain the incremental performance data.
7. The method of claim 1, wherein the delta summary results comprise current delta performance data summary results corresponding to a first preset time period, the method further comprising:
Summarizing the incremental performance data according to a second preset time period to obtain an initial incremental summarizing result, wherein the second preset time period is smaller than the first preset time period;
and under the condition that one initial increment summarizing result is obtained, summarizing at least one initial increment summarizing result currently obtained in the first preset time period, and obtaining the increment summarizing result.
8. The data processing device is characterized by comprising a reading unit, a summarizing unit and an updating unit;
The reading unit is used for reading incremental performance data from a performance data queue through a stream processing program, wherein the performance data queue is used for recording network performance data acquired from at least one network element of a communication network;
The summarizing unit is used for summarizing the incremental performance data by utilizing the current resource data in a preset database to obtain an incremental summarizing result, wherein the preset database is used for recording the resource data and performance data summarizing result corresponding to the at least one network element;
the updating unit is used for updating the preset database according to the increment summarization result;
The updating unit is further used for generating an index corresponding to the increment summarization result according to attribute information of the increment summarization result stored in a preset updating queue, wherein the attribute information comprises at least one of a network element name, a summarization time dimension, a summarization space dimension and a time node of summarization data;
The updating unit is further configured to match the index with a preset index corresponding to the at least one network element, and update the preset database based on a matching result.
9. A data processing apparatus, comprising:
A memory for storing executable instructions;
a processor for implementing the data processing method of any one of claims 1-7 when executing executable instructions stored in said memory.
10. A computer readable storage medium storing executable instructions which, when executed, are adapted to cause a processor to perform the data processing method of any one of claims 1-7.
CN202211114115.4A 2022-09-14 2022-09-14 Data processing method, device and computer-readable storage medium Active CN116795909B (en)

Priority Applications (1)

Application Number Priority Date Filing Date Title
CN202211114115.4A CN116795909B (en) 2022-09-14 2022-09-14 Data processing method, device and computer-readable storage medium

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
CN202211114115.4A CN116795909B (en) 2022-09-14 2022-09-14 Data processing method, device and computer-readable storage medium

Publications (2)

Publication Number Publication Date
CN116795909A CN116795909A (en) 2023-09-22
CN116795909B true CN116795909B (en) 2025-08-26

Family

ID=88048621

Family Applications (1)

Application Number Title Priority Date Filing Date
CN202211114115.4A Active CN116795909B (en) 2022-09-14 2022-09-14 Data processing method, device and computer-readable storage medium

Country Status (1)

Country Link
CN (1) CN116795909B (en)

Families Citing this family (1)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN119135559A (en) * 2024-11-13 2024-12-13 深圳市优网科技有限公司 A preprocessing device and method for multi-dimensional communication index data

Citations (1)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN114238516A (en) * 2021-12-21 2022-03-25 浙江太美医疗科技股份有限公司 Data synchronization method, system and computer readable medium

Family Cites Families (2)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US9292567B2 (en) * 2007-12-12 2016-03-22 Oracle International Corporation Bulk matching with update
US9009709B2 (en) * 2010-03-16 2015-04-14 Salesforce.Com, Inc. Asynchronous rollup numbers forecasting methods and systems

Patent Citations (1)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN114238516A (en) * 2021-12-21 2022-03-25 浙江太美医疗科技股份有限公司 Data synchronization method, system and computer readable medium

Also Published As

Publication number Publication date
CN116795909A (en) 2023-09-22

Similar Documents

Publication Publication Date Title
CN109918349B (en) Log processing method, log processing device, storage medium and electronic device
CN107038162B (en) Real-time data query method and system based on database log
US10204147B2 (en) System for capture, analysis and storage of time series data from sensors with heterogeneous report interval profiles
CN111125260A (en) A data synchronization method and system based on SQL Server
CN112507029B (en) Data processing system and data real-time processing method
US8095690B2 (en) Machine-readable medium for storing a stream data processing program and computer system
CN110795499B (en) Cluster data synchronization method, device, equipment and storage medium based on big data
CN110795428A (en) Time sequence data storage method and time sequence database applied to industrial Internet of things
CN113282611B (en) Method, device, computer equipment and storage medium for synchronizing stream data
CN109739818B (en) A convenient high-throughput big data collection method and system
CN112559475B (en) Data real-time capturing and transmitting method and system
CN106325984B (en) Big data task scheduling device
CN111222089B (en) Data processing method, data processing device, computer equipment and storage medium
CN110083600A (en) A kind of method, apparatus, calculating equipment and the storage medium of log collection processing
CN116795909B (en) Data processing method, device and computer-readable storage medium
CN113760950B (en) Index data query method, device, electronic equipment and storage medium
CN113220530B (en) Data quality monitoring method and platform
US20120323924A1 (en) Method and system for a multiple database repository
CN112328702A (en) Data synchronization method and system
CN108763323A (en) Meteorological lattice point file application process based on resource set and big data technology
CN115952200A (en) Multi-source heterogeneous data aggregation query method and device based on MPP (maximum power point tracking) architecture
CN119149092A (en) Mobile application platform fusion base development method and device
WO2025123878A1 (en) Data synchronization verification method and apparatus, and electronic device
CN113360576A (en) Power grid mass data real-time processing method and device based on Flink Streaming
CN104199930B (en) Data acquire and the system and method for processing

Legal Events

Date Code Title Description
PB01 Publication
PB01 Publication
SE01 Entry into force of request for substantive examination
SE01 Entry into force of request for substantive examination
GR01 Patent grant
GR01 Patent grant