Device data storage Doris method based on industrial Internet
Technical Field
The invention relates to the technical field of data storage, in particular to a device data storage Doris method based on the industrial Internet.
Background
With the development of industrial Internet and Internet of things, the data acquisition amount of equipment is increased, and the requirements on real-time performance, reliability and lateral expansion are increased.
The existing scheme mainly comprises the steps of directly writing data into a relational database, storing big data based on Hadoop ecology, fusing message middleware and a database, and a special time sequence database. However, the schemes have obvious defects that the relational database is easy to have performance bottleneck under high concurrency writing, the database is required to be divided into tables or expanded, the cost and the operation and maintenance are complex, the large data scheme is required to be deployed with Kafka, HDFS, YARN, hive/Spark and other sets of clusters, the architecture is complex, the operation and maintenance cost is high, the HDFS multi-copy storage wastes resources, the instantaneity is poor, the message middleware plus database scheme is required to be additionally consumed by programs, the logic of a consumption end is complex, the expansibility is limited, the historical data archiving is not uniform and is easy to make mistakes, the special time sequence database is optimized for time sequence data, the performance of the clusters is easy to have bottleneck under massive concurrency writing scenes, and the reservation strategy of a TTL mode lacks flexibility, so that the differentiated requirements of different equipment and service scenes can not be met. In addition, the time sequence database can also have problems of data inclination, overtime inquiry and the like, and the risk of instability of the system is increased.
In general, it is difficult to combine high concurrent writing, second level analysis, controllable storage cost and flexible partition management in the prior art, so a new method for storing data acquired by equipment is needed to solve the above-mentioned drawbacks.
Disclosure of Invention
The invention aims to solve the problems of write-in performance bottleneck, complex system architecture, high storage cost, inflexible partition management and the like in equipment data storage in the prior art, and provides an equipment data storage Doris method based on an industrial Internet, which realizes high concurrency write-in, real-time query and automatic data archiving management.
In order to achieve the above purpose, the present invention adopts the following technical scheme:
An equipment data storage Doris method based on industrial Internet comprises the following specific steps:
S1, acquiring industrial equipment node data, namely acquiring the data of a field PLC or gateway equipment in real time, and preparing for subsequent cleaning and storage;
S2, preprocessing the acquired data, namely performing de-duplication, alignment, denoising and standardization on the originally acquired data, and ensuring the quality and consistency of the data in subsequent warehouse entry;
S3, the preprocessed data is thrown up to Kafka, data buffering and decoupling are achieved through a distributed message queue, and downstream parallel consumption and good expansibility are achieved;
S4, doris consumes the Kafka data and stores the data, namely, doris pulls the JSON data from the Kafka in real time and writes the JSON data into a relational table, and meanwhile, automatic partitioning and expiration cleaning of the data are realized by means of dynamic partitioning;
S5, monitoring and performance optimization, namely monitoring an end-to-end link from acquisition to storage, finding out bottlenecks and anomalies in time, and performing targeted optimization;
And S6, expanding secondary utilization and downstream analysis, and supporting subsequent data warehouse butt joint, real-time calculation and machine learning application on the premise of ensuring data quality and real-time performance.
As a further technical solution of the present invention, the S1 specifically includes:
S11, selecting an acquisition mode, namely directly connecting the PLC to acquire or forwarding the acquisition by a gateway;
S111, PLC direct connection acquisition, namely establishing communication protocols such as Modbus (TCP/RTU), PROFINET, etherNet/IP and the like between the on-site PLC and an upper computer through an Ethernet or a serial port, and initiating a read request to the PLC by a system according to a preset sampling frequency (for example, once every second or once every ten seconds) to acquire a real-time value in a designated register or a data block;
If some field devices do not support the traditional PLC protocol, the gateway transmits and collects data of the device protocol itself through an industrial edge gateway (such as an OPC UA gateway, a Modbus gateway, a Siemens S7 gateway and the like), then converts the analyzed original information (usually key value pairs or custom binary formats) into a unified data format (such as JSON) and transmits the unified data format to a back-end system;
S12, a transmission protocol and a throwing mode, namely MQTT throwing or HTTP interface throwing;
And S13, the time stamp and the unique identification are processed, namely, in the acquisition end and the gateway, a consistent time standard is used, and a millisecond-level Unix time stamp is generally adopted, namely, the number of milliseconds from 1 month 1 day 1970 to the current moment is counted, so that the data uploaded by different equipment can be ensured to have unified standards in time sequence, and for the unique identification of the equipment, if the equipment does not have a global unique ID, the equipment can be spliced by a series of fixed prefixes (such as gateway numbers) and local numbers of the equipment on the gateway, for example, GW01_PL05 can be used as the global unique number of the equipment. This ensures that no confusion of devices will occur in the subsequent processing and storage.
As a further technical solution of the present invention, the S12 specifically includes:
S121, MQTT uploading, namely, using a lightweight MQTT protocol, a gateway or an acquisition end publishes processed data to a designated theme (Topic) in the form of a message, wherein the theme can be organized in a layered manner according to an enterprise number/Factory number/workshop number/equipment number, for example,/CompanyA/Factory 1/Line2/PLC_001, qoS (quality of service) can be generally selected to be at least one time or at least one time, ensuring that the message is not lost, and repetition can occur, the resource can be more consumed due to the fact that the message is at least one time, a reserved message mark (retain) is generally set to be 'unreserved', so that the existing subscriber receives outdated historical messages, information carried in the reporting process comprises equipment numbers, time stamps, sensor readings and the like, and if the connection is disconnected or the server returns an error, retry times and retry intervals can be set, and an alarm or a local backup is written after multiple times of failed attempts, so as to prevent the data from being lost;
S122, HTTP interface upload, sending collected data to a back-end service through HTTP POST request, interface address example, http:// < back-end domain name >: < port >/api/v1 depicting depictingthe collected data to the back-end service through HTTP POST request
Device/Data, content-Type: application +.
If the request fails due to network problem or back end abnormality, the client will try again according to preset retry strategy (such as retry three times each time for half a second), after all failure, the error information is recorded in local log, and the alarm is triggered to inform the operation and maintenance personnel.
As a further technical solution of the present invention, the S2 specifically includes:
s21, repeating data filtering and discarding invalid data;
s211, filtering repeated data, namely if the same equipment uploads the same time stamp and the same data value to the rear end for a plurality of times in a short time, regarding the same equipment as repeated data, after receiving the data, checking whether the equipment number and the time stamp are completely consistent with the last time or not by the system, and if so, directly discarding the obtained data to avoid redundant storage;
S212, discarding invalid data, wherein the message uploaded by the equipment often contains a status code field for indicating whether the sampling state is normal, if the status code is not normal, the data is invalid and is directly discarded and does not enter a subsequent processing flow, and if the value of key physical quantities such as temperature, pressure and the like obviously exceeds a reasonable range defined by a product specification or service (for example, the temperature is higher than the rated upper limit or lower than the rated lower limit), the data is regarded as abnormal data, and the abnormal data is discarded and recorded in a log;
S22, data missing and interpolation, namely, to ensure continuous time sequence data, sometimes the missing moment in the acquisition process needs to be complemented, the common practice is to check the time span of the same equipment between two adjacent effective samples, if the time interval does not exceed the preset maximum interval (for example, within 5 seconds), estimate the linear relation between the effective values before and after, fill the missing point, if the interval is too large, the interpolation is not performed any more, the last effective value or null value is directly filled instead, so as to avoid error accumulation, and for the simple case, the latest value filling is adopted, the latest effective sampling value is directly used for replacing the missing moment, and if the requirement is more accurate, advanced interpolation algorithms (such as polynomial interpolation, spline interpolation and the like) can be used at the application layer, but the realization cost and the calculation cost are relatively higher, and the latest effective value or null value is rarely used in the general industrial Internet of things scene;
s23, data cleaning and standardization, including unit unification, noise filtering, field renaming and supplementing;
s231, unifying the units, namely converting all physical quantities into standard units, for example, if the temperature unit uploaded by the field device is Fahrenheit, converting the temperature unit into the temperature before entering the next step by the system so as to ensure that the subsequent analysis and storage have consistency;
S232, noise filtering, namely, smoothing data sometimes due to noise interference collected on site, wherein a common method is moving average or median filtering, such as averaging the values of the last N times of sampling to eliminate short-time pulse interference, or taking the median of the N times of sampling to further remove extreme value influence;
S233, field renaming and supplementing, namely, if a downstream system (such as a Kafka producer) requires field names and formats to be fixed, unified renaming of original field names is needed, for example, a 'temp' field is changed into a 'temp' field after cleaning, a 'press' field is changed into a 'press', and if a message lacks certain predefined fields, default values (such as 0 or unknown are set for a number) can be supplemented, and null values can be set according to service requirements;
S24, data encapsulation is JSON, namely after the cleaning is finished, the data is organized into a unified JSON structure, and the JSON structure generally comprises the following parts of device_id, device unique identification;
the system comprises a server, a timer, a data, a status and a metadata, wherein the timer is a millisecond level Unix timestamp when the server receives the data, the data is a key value pair (such as temperature, pressure, humidity and the like) of a sensor reading of a core, the status is a device status code used for indicating whether sampling is normal or not, and the metadata is optional information such as gateway number, signal strength, acquisition end IP and other auxiliary fields;
Example JSON (illustrative only):
{ "device_id": "GW01_PLC05", "timestamp": 171XXXXXXX000,
"data": { "temperature": 42.8, "pressure": 1.05, "humidity": 58.2}, "status": 0, "metadata": { "gateway_id": "GW01", "signal_strength": -65}};
After the fixed format is adopted, when the fixed format is subsequently sent to Kafka or an HTTP interface is called, the data of the JSON structure can be directly used as a message body, and the meaning of each field is kept consistent.
As a further technical solution of the present invention, the S3 specifically includes:
s31, kafka cluster and Topic planning, namely, building at least 3 Kafka broaders at the back end to ensure high availability of the system, and designing Topic names according to business logic, such as' companyA/u
The method comprises the steps of (1) converging data from the same production line or the same type of equipment under the same Topic for centralized processing, configuring proper partition numbers for the Topic according to peak message quantity and system concurrency capacity, wherein the more the partitions are, the stronger downstream parallel consumption capacity is, but the management overhead of a Broker is increased, generally evaluating the peak message rate of the system (such as 10 ten thousand messages per second), and finally determining proper partition numbers of 8, 16, 32 and the like by combining the maximum throughput of each partition;
s32, kafka Producer configuration;
S33, message sending and exception handling, wherein the Producer calls an asynchronous sending interface and monitors a sending result in a callback function, if the sending is successful, a successful log is recorded or ignored, if the sending is failed, the reason of the sending failure (such as network timeout, broker is unavailable and the like) is recorded to the log, and retrying is performed according to a retry strategy;
The throughput optimization is that batch transmission parameters can be set appropriately, for example, the batch transmission is carried out once when a certain number (such as 500 bars) is accumulated or a certain byte number (such as 32 KB) is reached, network overhead caused by frequent small batch transmission is avoided, and meanwhile, the 'linger.ms' can be set to be tens of milliseconds, so that the message arrived in a short time is packed by the Producer as much as possible.
As a further technical solution of the present invention, the S32 specifically includes:
S321: a Broker list, writing the addresses of all the brokers into the configuration of the Producer, e.g. Broker1:9092, broker2:9092, broker3:9092;
s322, a message confirmation mechanism (acks) is recommended to be set as 'all', which means that the message is successfully sent after all In-Sync copies are written, and the data is ensured not to be lost;
The number of retries is that the number of retries is automatically retried when the transmission fails, for example, the number is set to 3, if the transmission still fails after a plurality of retries, an alarm is triggered or a local log is written, so that the data is prevented from being silently lost;
S324, a serializer, wherein a character string is used for serializing when the equipment number is transmitted as a Key, and UTF-8 code is also used for serializing when the JSON character string is used as a Value;
And S325, partitioning and distributing strategies, namely adopting a default hash partitioning strategy, namely performing modulo operation on the total number of partitions after hashing by using equipment numbers, so that all data of the same equipment can be ensured to fall into the same partition, and the time sequence and sequential consumption of the data of the same equipment are ensured.
As a further technical solution of the present invention, the S4 specifically includes:
the system can automatically create a partition of 5 days from '3 days before to' 1 day after before according to the current date, the partition naming mode is PYYYYMMDD, for example, p20250606 represents a partition of 2025, 6 months and 6 days, thus ensuring that the data of the day and the recent days of history have corresponding partitions, the query is also more efficient, the partition retention time is set, for example, 30 days, namely, when a certain partition is created for a plurality of days (for example, 30 days), the system can automatically delete the partition, realize automatic cleaning of expired data, so that the management of storage space is more flexible, and simultaneously perform a partition (Hash partition) design, for example, perform partition according to the device_id, set 16 buckets, thus, when the query and write are performed, different pieces of data can be dispersed to different nodes in parallel, and the performance of the data can be improved;
S42, creating a route Load task, namely executing a special SQL statement on Doris to create the route Load task, designating which Kafka Topic is used for pulling data, what concurrency number is used for pulling the maximum number of bytes or bytes of each batch, and how to analyze JSON, wherein the mapping from the JSON field to the table field is provided by a JSONPaths file, and the correspondence is approximately as follows;
In the routing Load task configuration, the number of concurrent consumers (such as 4 concurrent threads), the maximum number of lines (such as 5000 MB) or the maximum number of bytes (such as 10 MB) of each batch, and a strict mode (strict mode) switch can be specified, and after the strict mode is started, if the fields in a certain JSON are missing or the types are not matched, the dirty data is considered and discarded, and logs are recorded at the same time so as to ensure the data consistency;
the method comprises the steps of S43, carrying out data loading flow and dirty data processing, namely, carrying out parallel pulling of information from a Kafka partition according to a specified concurrency number, wherein the information which accords with a threshold value (number of lines or bytes) is stored in a temporary file when being pulled, carrying out decompression and analysis on the temporary file by a system, extracting fields according to JSONPaths mapping by JSON, and writing the fields into a memory table, automatically distributing memory table data to corresponding partitions according to a partition strategy, and carrying out batch persistence of the data to column storage by a back-end node;
S44, dynamic partition and expiration cleaning, namely, due to the fact that a dynamic partition function is started, doris automatically judges the current date every day, and creates 5 partitions from 3 days before the current date to 1 day after the current date, for example, p20250603, p20250604, p20250605, p20250606 and p20250607 can be created if the current date is 2025, 6 months after the current date, when a certain partition is created for a plurality of days (for example, 30 days after p20250506 expires), the system automatically deletes the partition without manual intervention, so that storage space is saved, and if the production environment needs to keep longer historical data, the parameter of the expiration time of the partition is changed from 30 days to 60 days, 90 days or longer.
As a further technical solution of the present invention, the S5 specifically includes:
s51, end-to-end monitoring indexes comprise acquisition end monitoring, kafka monitoring and Doris monitoring;
s511, monitoring by a collecting end:
The sampling times per second or per minute are counted regularly, and if the sampling times are reduced or interrupted, the PLC connectivity or gateway state needs to be checked;
When the gateway delivers to the back end, if the network condition is bad, retry or packet loss occurs, the ratio of the retry number to the total transmission number is monitored, and when the ratio exceeds a certain threshold (such as 5%), an alarm is sent;
s512, kafka monitoring:
Consumption hysteresis (Consumer Lag), which is the difference between the consumption speed of downstream Consumer (route Load) and the delivery speed of Producer (Producer), is measured by checking the difference between the current write offset and the submitted offset of each partition, and when the hysteresis value increases and continuously exceeds a certain threshold (such as 1 ten thousand pieces), it is indicated that the downstream consumption is not coming, and the concurrent number needs to be expanded or adjusted;
throughput, namely counting the rate (message/second) and byte rate of writing messages to Kafka by a Producer, and confirming whether the maximum bearing capacity of the cluster is reached or not;
Checking whether the information distribution of each partition is balanced or not, and if the load of one partition is far higher than that of other partitions, increasing the partition number or changing the custom partition strategy;
S513, doris monitoring:
The route Load delay is measured by measuring the time difference between the latest data in Kafka and the data written into Doris, and the maximum event time (event_time) of the inserted data in a periodic lookup table is compared with the current system time, if the delay lasts for more than 5 minutes, the downstream writing performance bottleneck possibly occurs, and the problem needs to be checked;
Counting the query number per second or the transaction number per second of each BE node during writing, and considering expansion or optimization parameters when a certain node is in a high-load state (such as CPU usage rate exceeding 80 percent);
The data volume of each partition is monitored, and when the data volume of a certain partition is far more than expected (such as more than 100 GB), the barrel division number is considered to be adjusted or the historical partition is refined;
and S52, performance optimization strategies including Kafka optimization and Doris optimization.
As a further technical solution of the present invention, the S52 specifically includes:
S521 Kafka optimization:
batch sending and compression, namely setting proper batch waiting time (such as tens of milliseconds) at the Producer end, aggregating a plurality of messages arriving in a short time, sending the messages together, and starting a compression algorithm, such as Snappy or LZ4, so that the occupation of network bandwidth can be reduced, and the writing speed of a Broker can be improved;
the number of concurrent consumers is reasonably configured, namely if the number of concurrent of route Load is too small, the consumption speed cannot keep up with the production speed, and if the number of concurrent is too large, the writing task of Doris can contend for resources on a back-end node;
s522, doris optimization:
Column memory coding and compression, namely, aiming at common fields (such as temperature, pressure and other numerical value columns) in a table, adopting a column memory compression coding mode (such as RLE and dictionary compression), greatly reducing memory occupation and improving scanning efficiency;
Small file merging, namely if a large number of small files are generated due to frequent writing, the query speed is influenced, and the partition file merging operation can be executed regularly, so that the small files are automatically merged into a large file;
and (3) maintaining statistical information, namely periodically performing analysis operation on the table, and updating the column histogram statistical information so that a query optimizer can select a better execution plan.
As a further technical solution of the present invention, the step S6 specifically includes:
S61, data warehouse butt joint, namely constructing a wide table into a real table in Doris, designing a dimension table according to service requirements, synchronizing data to a downstream data warehouse (such as Hive/Hadoop, clickHouse, presto and the like) through ETL or timing tasks to perform offline report and multidimensional analysis if more complex online analysis processing (OLAP) is needed;
S62, calculating and alarming in real time, namely, using a flow processing engine APACHE FLINK and SPARK STREAMING between Kafka and Dori or in parallel to Doris to directly consume the preprocessed data and then process complex events (such as anomaly detection and trend judgment), and when indexes such as temperature, pressure and the like are found to exceed a preset threshold value, triggering a short message or mail alarm immediately;
The machine learning and deep learning application comprises the steps of performing feature engineering on historical time sequence data stored in Doris according to a time window (such as day, week and month), aggregating temperature, pressure, vibration and other data into statistical features (maximum value, minimum value, average value, variance and the like), constructing a feature table required by model training, training a time sequence prediction model (such as LSTM or a model based on a transducer) by using a Python frame (such as TensorFlow, pyTorch), predicting the health state or the failure possibility of equipment in a short period, taking data of the past N days and N moments as input during training, outputting predicted values of future time, deploying the data as online reasoning service after the model training is completed and the effect is verified, periodically acquiring the latest time sequence data from the Doris, comparing the latest time sequence data with the current threshold after the predicted result is generated, and early warning potential failure.
The beneficial effects of the invention are as follows:
1. The method has the advantages of remarkably reducing data processing delay, improving instantaneity, reducing end-to-end processing delay from minute level to less than 30 seconds by throwing Kafka to gateway in real time and then consuming by Doris in parallel, enabling the peak time to be no more than 1 minute, enabling the success rate of data delivery to reach 99.9% under a high concurrency scene, effectively avoiding a data empty window period, realizing near real-time monitoring and early warning, and solving the problems that the traditional industrial system is dependent on batch import, 5-10 minutes are often required for acquisition from site to inquire, and data packet loss or backlog is easy to occur when buffering is lacked.
2. The method improves the quality and consistency of data, reduces the defects and anomalies, namely, the repetition uploading rate is reduced to less than 0.1% after the repetition uploading rate is finished before the upper polishing, the interpolation success rate of a missing point is more than 95%, the missing value is occupied only for a missing period exceeding 5 seconds, the dirty data writing rate is lower than 0.5% in a back-end strict mode, the integral data integrity and consistency are obviously superior to those of the traditional scheme, and the problems that a direct writing library mode in the prior art lacks a mechanism of repetition removing, interpolation and anomaly interception, the common repeated recording rate is as high as 2%, the missing data and noise interference is serious, and the follow-up manual cleaning and the retrieval are required are solved.
3. The automatic partitioning and storage cleaning reduces the operation and maintenance cost, improves the expandability, automatically maintains the partitioning from the current day to the previous day to the next day by utilizing the dynamic partitioning function of Doris, automatically deletes the partitioning after setting for 30 days without manual intervention, stably controls the data volume of a single partition to BE 50-80GB in actual operation, keeps the stability of query and writing performance, only expands Kafka Broker and Doris BE nodes when the number of devices is increased, reduces the operation and maintenance partitioning management workload by about 90%, and solves the problems that the traditional database partitioning needs manual addition, archiving and deleting, and the single partition size can exceed hundred GB if the partitioning management is missed along with the large workload of data accumulation, thereby causing the performance bottleneck.
Drawings
Fig. 1 is a flowchart of a method for storing Doris in equipment data based on industrial internet according to the present invention.
Detailed Description
The invention is further described in connection with the following detailed description, in order to make the technical means, the creation characteristics, the achievement of the purpose and the effect of the invention easy to understand.
Referring to fig. 1, a method for storing Doris based on equipment data of industrial internet includes the following specific steps:
S1, acquiring industrial equipment node data, namely acquiring the data of a field PLC or gateway equipment in real time, and preparing for subsequent cleaning and storage;
S11, selecting an acquisition mode, namely directly connecting the PLC to acquire or forwarding the acquisition by a gateway;
S111, PLC direct connection acquisition, namely establishing communication protocols such as Modbus (TCP/RTU), PROFINET, etherNet/IP and the like between the on-site PLC and an upper computer through an Ethernet or a serial port, and initiating a read request to the PLC by a system according to a preset sampling frequency (for example, once every second or once every ten seconds) to acquire a real-time value in a designated register or a data block;
If some field devices do not support the traditional PLC protocol, the gateway transmits and collects data of the device protocol itself through an industrial edge gateway (such as an OPC UA gateway, a Modbus gateway, a Siemens S7 gateway and the like), then converts the analyzed original information (usually key value pairs or custom binary formats) into a unified data format (such as JSON) and transmits the unified data format to a back-end system;
S12, a transmission protocol and a throwing mode, namely MQTT throwing or HTTP interface throwing;
S121, MQTT uploading, namely, using a lightweight MQTT protocol, a gateway or an acquisition end publishes processed data to a designated theme (Topic) in the form of a message, wherein the theme can be organized in a layered manner according to an enterprise number/Factory number/workshop number/equipment number, for example,/CompanyA/Factory 1/Line2/PLC_001, qoS (quality of service) can be generally selected to be at least one time or at least one time, ensuring that the message is not lost, and repetition can occur, the resource can be more consumed due to the fact that the message is at least one time, a reserved message mark (retain) is generally set to be 'unreserved', so that the existing subscriber receives outdated historical messages, information carried in the reporting process comprises equipment numbers, time stamps, sensor readings and the like, and if the connection is disconnected or the server returns an error, retry times and retry intervals can be set, and an alarm or a local backup is written after multiple times of failed attempts, so as to prevent the data from being lost;
S122, HTTP interface upload, sending collected data to a back-end service through HTTP POST request, interface address example, http:// < back-end domain name >: < port >/api/v1 depicting depictingthe collected data to the back-end service through HTTP POST request
Device/Data, content-Type: application +.
If the request fails due to network problems or back-end abnormality, the client retries according to a preset retry strategy (such as retries for three times at half a second intervals), and after all the retries, the client records error information to a local log and triggers an alarm to inform operation and maintenance personnel;
And S13, the time stamp and the unique identification are processed, namely, in the acquisition end and the gateway, a consistent time standard is used, and a millisecond-level Unix time stamp is generally adopted, namely, the number of milliseconds from 1 month 1 day 1970 to the current moment is counted, so that the data uploaded by different equipment can be ensured to have unified standards in time sequence, and for the unique identification of the equipment, if the equipment does not have a global unique ID, the equipment can be spliced by a series of fixed prefixes (such as gateway numbers) and local numbers of the equipment on the gateway, for example, GW01_PL05 can be used as the global unique number of the equipment. This ensures that no equipment confusion will occur during subsequent processing and storage;
S2, preprocessing the acquired data, namely performing de-duplication, alignment, denoising and standardization on the originally acquired data, and ensuring the quality and consistency of the data in subsequent warehouse entry;
s21, repeating data filtering and discarding invalid data;
s211, filtering repeated data, namely if the same equipment uploads the same time stamp and the same data value to the rear end for a plurality of times in a short time, regarding the same equipment as repeated data, after receiving the data, checking whether the equipment number and the time stamp are completely consistent with the last time or not by the system, and if so, directly discarding the obtained data to avoid redundant storage;
S212, discarding invalid data, wherein the message uploaded by the equipment often contains a status code field for indicating whether the sampling state is normal, if the status code is not normal, the data is invalid and is directly discarded and does not enter a subsequent processing flow, and if the value of key physical quantities such as temperature, pressure and the like obviously exceeds a reasonable range defined by a product specification or service (for example, the temperature is higher than the rated upper limit or lower than the rated lower limit), the data is regarded as abnormal data, and the abnormal data is discarded and recorded in a log;
S22, data missing and interpolation, namely, to ensure continuous time sequence data, sometimes the missing moment in the acquisition process needs to be complemented, the common practice is to check the time span of the same equipment between two adjacent effective samples, if the time interval does not exceed the preset maximum interval (for example, within 5 seconds), estimate the linear relation between the effective values before and after, fill the missing point, if the interval is too large, the interpolation is not performed any more, the last effective value or null value is directly filled instead, so as to avoid error accumulation, and for the simple case, the latest value filling is adopted, the latest effective sampling value is directly used for replacing the missing moment, and if the requirement is more accurate, advanced interpolation algorithms (such as polynomial interpolation, spline interpolation and the like) can be used at the application layer, but the realization cost and the calculation cost are relatively higher, and the latest effective value or null value is rarely used in the general industrial Internet of things scene;
s23, data cleaning and standardization, including unit unification, noise filtering, field renaming and supplementing;
s231, unifying the units, namely converting all physical quantities into standard units, for example, if the temperature unit uploaded by the field device is Fahrenheit, converting the temperature unit into the temperature before entering the next step by the system so as to ensure that the subsequent analysis and storage have consistency;
S232, noise filtering, namely, smoothing data sometimes due to noise interference collected on site, wherein a common method is moving average or median filtering, such as averaging the values of the last N times of sampling to eliminate short-time pulse interference, or taking the median of the N times of sampling to further remove extreme value influence;
S233, field renaming and supplementing, namely, if a downstream system (such as a Kafka producer) requires field names and formats to be fixed, unified renaming of original field names is needed, for example, a 'temp' field is changed into a 'temp' field after cleaning, a 'press' field is changed into a 'press', and if a message lacks certain predefined fields, default values (such as 0 or unknown are set for a number) can be supplemented, and null values can be set according to service requirements;
S24, data encapsulation is JSON, namely after the cleaning is finished, the data is organized into a unified JSON structure, and the JSON structure generally comprises the following parts of device_id, device unique identification;
the system comprises a server, a timer, a data, a status and a metadata, wherein the timer is a millisecond level Unix timestamp when the server receives the data, the data is a key value pair (such as temperature, pressure, humidity and the like) of a sensor reading of a core, the status is a device status code used for indicating whether sampling is normal or not, and the metadata is optional information such as gateway number, signal strength, acquisition end IP and other auxiliary fields;
Example JSON (illustrative only):
{ "device_id": "GW01_PLC05", "timestamp": 171XXXXXXX000,
"data": { "temperature": 42.8, "pressure": 1.05, "humidity": 58.2}, "status": 0, "metadata": { "gateway_id": "GW01", "signal_strength": -65}};
After the fixed format is adopted, when the fixed format is subsequently sent to Kafka or an HTTP interface is called, the data of the JSON structure can be directly used as a message body, and the meaning of each field is kept consistent;
S3, the preprocessed data is thrown up to Kafka, data buffering and decoupling are achieved through a distributed message queue, and downstream parallel consumption and good expansibility are achieved;
s31, kafka cluster and Topic planning, namely, building at least 3 Kafka broaders at the back end to ensure high availability of the system, and designing Topic names according to business logic, such as' companyA/u
The method comprises the steps of (1) converging data from the same production line or the same type of equipment under the same Topic for centralized processing, configuring proper partition numbers for the Topic according to peak message quantity and system concurrency capacity, wherein the more the partitions are, the stronger downstream parallel consumption capacity is, but the management overhead of a Broker is increased, generally evaluating the peak message rate of the system (such as 10 ten thousand messages per second), and finally determining proper partition numbers of 8, 16, 32 and the like by combining the maximum throughput of each partition;
s32, kafka Producer configuration;
S321: a Broker list, writing the addresses of all the brokers into the configuration of the Producer, e.g. Broker1:9092, broker2:9092, broker3:9092;
s322, a message confirmation mechanism (acks) is recommended to be set as 'all', which means that the message is successfully sent after all In-Sync copies are written, and the data is ensured not to be lost;
The number of retries is that the number of retries is automatically retried when the transmission fails, for example, the number is set to 3, if the transmission still fails after a plurality of retries, an alarm is triggered or a local log is written, so that the data is prevented from being silently lost;
S324, a serializer, wherein a character string is used for serializing when the equipment number is transmitted as a Key, and UTF-8 code is also used for serializing when the JSON character string is used as a Value;
s325, adopting a partition allocation strategy, namely adopting a default hash partition strategy, namely taking a module of the total number of partitions after hashing by using equipment numbers, so that all data of the same equipment can be ensured to fall into the same partition, and the time sequence and sequential consumption of the data of the same equipment are ensured;
S33, message sending and exception handling, wherein the Producer calls an asynchronous sending interface and monitors a sending result in a callback function, if the sending is successful, a successful log is recorded or ignored, if the sending is failed, the reason of the sending failure (such as network timeout, broker is unavailable and the like) is recorded to the log, and retrying is performed according to a retry strategy;
The throughput optimization, namely, batch transmission parameters can be properly set, for example, the batch transmission is carried out once when a certain number (such as 500 bars) is accumulated or a certain byte number (such as 32 KB) is reached, so that network overhead caused by frequent small batch transmission is avoided;
S4, doris consumes the Kafka data and stores the data, namely, doris pulls the JSON data from the Kafka in real time and writes the JSON data into a relational table, and meanwhile, automatic partitioning and expiration cleaning of the data are realized by means of dynamic partitioning;
the system can automatically create a partition of 5 days from '3 days before to' 1 day after before according to the current date, the partition naming mode is PYYYYMMDD, for example, p20250606 represents a partition of 2025, 6 months and 6 days, thus ensuring that the data of the day and the recent days of history have corresponding partitions, the query is also more efficient, the partition retention time is set, for example, 30 days, namely, when a certain partition is created for a plurality of days (for example, 30 days), the system can automatically delete the partition, realize automatic cleaning of expired data, so that the management of storage space is more flexible, and simultaneously perform a partition (Hash partition) design, for example, perform partition according to the device_id, set 16 buckets, thus, when the query and write are performed, different pieces of data can be dispersed to different nodes in parallel, and the performance of the data can be improved;
S42, creating a route Load task, namely executing a special SQL statement on Doris to create the route Load task, designating which Kafka Topic is used for pulling data, what concurrency number is used for pulling the maximum number of bytes or bytes of each batch, and how to analyze JSON, wherein the mapping from the JSON field to the table field is provided by a JSONPaths file, and the correspondence is approximately as follows;
In the routing Load task configuration, the number of concurrent consumers (such as 4 concurrent threads), the maximum number of lines (such as 5000 MB) or the maximum number of bytes (such as 10 MB) of each batch, and a strict mode (strict mode) switch can be specified, and after the strict mode is started, if the fields in a certain JSON are missing or the types are not matched, the dirty data is considered and discarded, and logs are recorded at the same time so as to ensure the data consistency;
the method comprises the steps of S43, carrying out data loading flow and dirty data processing, namely, carrying out parallel pulling of information from a Kafka partition according to a specified concurrency number, wherein the information which accords with a threshold value (number of lines or bytes) is stored in a temporary file when being pulled, carrying out decompression and analysis on the temporary file by a system, extracting fields according to JSONPaths mapping by JSON, and writing the fields into a memory table, automatically distributing memory table data to corresponding partitions according to a partition strategy, and carrying out batch persistence of the data to column storage by a back-end node;
S44, dynamic partition and expiration cleaning, namely, automatically judging the current date every day by Doris due to the starting of a dynamic partition function, creating 5 partitions from 3 days before the current date to 1 day after the current date, for example, if the current date is 2025, 6 months and 6 days after the current date, p20250603, p20250604, p20250605, p20250606 and p20250607 are created, when a certain partition is created for a plurality of days (for example, 30 days after p20250506 expires), automatically deleting the partition by a system without manual intervention, thereby saving storage space, and if the production environment needs to keep longer historical data, only changing the parameter of the expiration time of the partition from 30 days to 60 days, 90 days or longer;
S5, monitoring and performance optimization, namely monitoring an end-to-end link from acquisition to storage, finding out bottlenecks and anomalies in time, and performing targeted optimization;
s51, end-to-end monitoring indexes comprise acquisition end monitoring, kafka monitoring and Doris monitoring;
s511, monitoring by a collecting end:
The sampling times per second or per minute are counted regularly, and if the sampling times are reduced or interrupted, the PLC connectivity or gateway state needs to be checked;
When the gateway delivers to the back end, if the network condition is bad, retry or packet loss occurs, the ratio of the retry number to the total transmission number is monitored, and when the ratio exceeds a certain threshold (such as 5%), an alarm is sent;
s512, kafka monitoring:
Consumption hysteresis (Consumer Lag), which is the difference between the consumption speed of downstream Consumer (route Load) and the delivery speed of Producer (Producer), is measured by checking the difference between the current write offset and the submitted offset of each partition, and when the hysteresis value increases and continuously exceeds a certain threshold (such as 1 ten thousand pieces), it is indicated that the downstream consumption is not coming, and the concurrent number needs to be expanded or adjusted;
throughput, namely counting the rate (message/second) and byte rate of writing messages to Kafka by a Producer, and confirming whether the maximum bearing capacity of the cluster is reached or not;
Checking whether the information distribution of each partition is balanced or not, and if the load of one partition is far higher than that of other partitions, increasing the partition number or changing the custom partition strategy;
S513, doris monitoring:
The route Load delay is measured by measuring the time difference between the latest data in Kafka and the data written into Doris, and the maximum event time (event_time) of the inserted data in a periodic lookup table is compared with the current system time, if the delay lasts for more than 5 minutes, the downstream writing performance bottleneck possibly occurs, and the problem needs to be checked;
Counting the query number per second or the transaction number per second of each BE node during writing, and considering expansion or optimization parameters when a certain node is in a high-load state (such as CPU usage rate exceeding 80 percent);
The data volume of each partition is monitored, and when the data volume of a certain partition is far more than expected (such as more than 100 GB), the barrel division number is considered to be adjusted or the historical partition is refined;
s52, performance optimization strategies including Kafka optimization and Doris optimization;
S521 Kafka optimization:
batch sending and compression, namely setting proper batch waiting time (such as tens of milliseconds) at the Producer end, aggregating a plurality of messages arriving in a short time, sending the messages together, and starting a compression algorithm, such as Snappy or LZ4, so that the occupation of network bandwidth can be reduced, and the writing speed of a Broker can be improved;
the number of concurrent consumers is reasonably configured, namely if the number of concurrent of route Load is too small, the consumption speed cannot keep up with the production speed, and if the number of concurrent is too large, the writing task of Doris can contend for resources on a back-end node;
s522, doris optimization:
Column memory coding and compression, namely, aiming at common fields (such as temperature, pressure and other numerical value columns) in a table, adopting a column memory compression coding mode (such as RLE and dictionary compression), greatly reducing memory occupation and improving scanning efficiency;
Small file merging, namely if a large number of small files are generated due to frequent writing, the query speed is influenced, and the partition file merging operation can be executed regularly, so that the small files are automatically merged into a large file;
The statistical information maintenance, which is to periodically execute analysis operation on the table, update the column histogram statistical information and enable the query optimizer to select a better execution plan;
s6, expanding secondary utilization and downstream analysis, and supporting subsequent data warehouse butt joint, real-time calculation and machine learning application on the premise of ensuring data quality and real-time performance;
S61, data warehouse butt joint, namely constructing a wide table into a real table in Doris, designing a dimension table according to service requirements, synchronizing data to a downstream data warehouse (such as Hive/Hadoop, clickHouse, presto and the like) through ETL or timing tasks to perform offline report and multidimensional analysis if more complex online analysis processing (OLAP) is needed;
S62, calculating and alarming in real time, namely, using a flow processing engine APACHE FLINK and SPARK STREAMING between Kafka and Dori or in parallel to Doris to directly consume the preprocessed data and then process complex events (such as anomaly detection and trend judgment), and when indexes such as temperature, pressure and the like are found to exceed a preset threshold value, triggering a short message or mail alarm immediately;
The machine learning and deep learning application comprises the steps of performing feature engineering on historical time sequence data stored in Doris according to a time window (such as day, week and month), aggregating temperature, pressure, vibration and other data into statistical features (maximum value, minimum value, average value, variance and the like), constructing a feature table required by model training, training a time sequence prediction model (such as LSTM or a model based on a transducer) by using a Python frame (such as TensorFlow, pyTorch), predicting the health state or the failure possibility of equipment in a short period, taking data of the past N days and N moments as input during training, outputting predicted values of future time, deploying the data as online reasoning service after the model training is completed and the effect is verified, periodically acquiring the latest time sequence data from the Doris, comparing the latest time sequence data with the current threshold after the predicted result is generated, and early warning potential failure.
From the description, the embodiment of the invention has the advantages that the data processing delay is obviously reduced, the real-time performance is improved, the data processing delay is timely thrown to Kafka through a gateway and is parallelly consumed by Doris, the end-to-end processing delay is reduced to less than 30 seconds from a minute level, the peak time is not more than 1 minute, the success rate of data delivery can reach 99.9% under a high concurrency scene, the 'data empty window period' is effectively avoided, the near real-time monitoring and early warning are realized, the problem that the traditional industrial system is mostly dependent on batch import, 5-10 minutes are often required for acquisition from the site, and the data packet loss or backlog easily occurs when the buffering is lacked is solved.
The method improves the quality and consistency of data, reduces the defects and anomalies, namely, the repetition uploading rate is reduced to less than 0.1% after the repetition uploading rate is finished before the upper polishing, the interpolation success rate of a missing point is more than 95%, the missing value is occupied only for a missing period exceeding 5 seconds, the dirty data writing rate is lower than 0.5% in a back-end strict mode, the integral data integrity and consistency are obviously superior to those of the traditional scheme, and the problems that a direct writing library mode in the prior art lacks a mechanism of repetition removing, interpolation and anomaly interception, the common repeated recording rate is as high as 2%, the missing data and noise interference is serious, and the follow-up manual cleaning and the retrieval are required are solved.
The automatic partitioning and storage cleaning reduces the operation and maintenance cost, improves the expandability, automatically maintains the partitioning from the current day to the previous day to the next day by utilizing the dynamic partitioning function of Doris, automatically deletes the partitioning after setting for 30 days without manual intervention, stably controls the data volume of a single partition to BE 50-80GB in actual operation, keeps the stability of query and writing performance, only expands Kafka Broker and Doris BE nodes when the number of devices is increased, reduces the operation and maintenance partitioning management workload by about 90%, and solves the problems that the traditional database partitioning needs manual addition, archiving and deleting, and the single partition size can exceed hundred GB if the partitioning management is missed along with the large workload of data accumulation, thereby causing the performance bottleneck.
It will be appreciated by persons skilled in the art that the above discussion of any embodiment is merely exemplary and is not intended to imply that the scope of the invention is limited to these examples, that combinations of technical features in the above embodiments or in different embodiments may also be implemented in any order within the spirit of the invention, and that many other variations of the different aspects of the invention as described above exist, which are not provided in detail for the sake of brevity.
The present invention is intended to embrace all such alternatives, modifications and variances which fall within the broad scope of the specification. Therefore, any omission, modification, equivalent replacement, improvement, etc. of the present invention should be included in the scope of the present invention.