[go: up one dir, main page]

CN120386780B - Device data storage Doris method based on industrial Internet - Google Patents

Device data storage Doris method based on industrial Internet

Info

Publication number
CN120386780B
CN120386780B CN202510873588.XA CN202510873588A CN120386780B CN 120386780 B CN120386780 B CN 120386780B CN 202510873588 A CN202510873588 A CN 202510873588A CN 120386780 B CN120386780 B CN 120386780B
Authority
CN
China
Prior art keywords
data
doris
kafka
partition
time
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Active
Application number
CN202510873588.XA
Other languages
Chinese (zh)
Other versions
CN120386780A (en
Inventor
牟文青
时雨
李海鹏
解贞东
卢基
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
Shandong Jerei Digital Technology Co Ltd
Original Assignee
Shandong Jerei Digital Technology Co Ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Shandong Jerei Digital Technology Co Ltd filed Critical Shandong Jerei Digital Technology Co Ltd
Priority to CN202510873588.XA priority Critical patent/CN120386780B/en
Publication of CN120386780A publication Critical patent/CN120386780A/en
Application granted granted Critical
Publication of CN120386780B publication Critical patent/CN120386780B/en
Active legal-status Critical Current
Anticipated expiration legal-status Critical

Links

Landscapes

  • Information Retrieval, Db Structures And Fs Structures Therefor (AREA)

Abstract

本发明涉及数据存储技术领域,具体公开了一种基于工业互联网的设备数据存储Doris方法,包括以下具体步骤:S1:获取工业设备节点数据;S2:对获取的数据进行预处理;S3:将预处理后的数据上抛至Kafka;S4:Doris消费Kafka数据并存储;S5:监控与性能优化;S6:扩展:二次利用与下游分析。本发明显著降低了数据处理延迟,提升实时性;提升了数据质量和一致性,减少缺失与异常;自动化分区与存储清理降低运维成本,提升可扩展性。

The present invention relates to the field of data storage technology, specifically disclosing a Doris method for device data storage based on the Industrial Internet. The method comprises the following specific steps: S1: acquiring industrial device node data; S2: preprocessing the acquired data; S3: uploading the preprocessed data to Kafka; S4: Doris consuming and storing Kafka data; S5: monitoring and performance optimization; and S6: expansion: secondary utilization and downstream analysis. The method significantly reduces data processing latency, improving real-time performance; enhances data quality and consistency, reducing missing and anomalies; and automates partitioning and storage cleanup to reduce operation and maintenance costs and improve scalability.

Description

Device data storage Doris method based on industrial Internet
Technical Field
The invention relates to the technical field of data storage, in particular to a device data storage Doris method based on the industrial Internet.
Background
With the development of industrial Internet and Internet of things, the data acquisition amount of equipment is increased, and the requirements on real-time performance, reliability and lateral expansion are increased.
The existing scheme mainly comprises the steps of directly writing data into a relational database, storing big data based on Hadoop ecology, fusing message middleware and a database, and a special time sequence database. However, the schemes have obvious defects that the relational database is easy to have performance bottleneck under high concurrency writing, the database is required to be divided into tables or expanded, the cost and the operation and maintenance are complex, the large data scheme is required to be deployed with Kafka, HDFS, YARN, hive/Spark and other sets of clusters, the architecture is complex, the operation and maintenance cost is high, the HDFS multi-copy storage wastes resources, the instantaneity is poor, the message middleware plus database scheme is required to be additionally consumed by programs, the logic of a consumption end is complex, the expansibility is limited, the historical data archiving is not uniform and is easy to make mistakes, the special time sequence database is optimized for time sequence data, the performance of the clusters is easy to have bottleneck under massive concurrency writing scenes, and the reservation strategy of a TTL mode lacks flexibility, so that the differentiated requirements of different equipment and service scenes can not be met. In addition, the time sequence database can also have problems of data inclination, overtime inquiry and the like, and the risk of instability of the system is increased.
In general, it is difficult to combine high concurrent writing, second level analysis, controllable storage cost and flexible partition management in the prior art, so a new method for storing data acquired by equipment is needed to solve the above-mentioned drawbacks.
Disclosure of Invention
The invention aims to solve the problems of write-in performance bottleneck, complex system architecture, high storage cost, inflexible partition management and the like in equipment data storage in the prior art, and provides an equipment data storage Doris method based on an industrial Internet, which realizes high concurrency write-in, real-time query and automatic data archiving management.
In order to achieve the above purpose, the present invention adopts the following technical scheme:
An equipment data storage Doris method based on industrial Internet comprises the following specific steps:
S1, acquiring industrial equipment node data, namely acquiring the data of a field PLC or gateway equipment in real time, and preparing for subsequent cleaning and storage;
S2, preprocessing the acquired data, namely performing de-duplication, alignment, denoising and standardization on the originally acquired data, and ensuring the quality and consistency of the data in subsequent warehouse entry;
S3, the preprocessed data is thrown up to Kafka, data buffering and decoupling are achieved through a distributed message queue, and downstream parallel consumption and good expansibility are achieved;
S4, doris consumes the Kafka data and stores the data, namely, doris pulls the JSON data from the Kafka in real time and writes the JSON data into a relational table, and meanwhile, automatic partitioning and expiration cleaning of the data are realized by means of dynamic partitioning;
S5, monitoring and performance optimization, namely monitoring an end-to-end link from acquisition to storage, finding out bottlenecks and anomalies in time, and performing targeted optimization;
And S6, expanding secondary utilization and downstream analysis, and supporting subsequent data warehouse butt joint, real-time calculation and machine learning application on the premise of ensuring data quality and real-time performance.
As a further technical solution of the present invention, the S1 specifically includes:
S11, selecting an acquisition mode, namely directly connecting the PLC to acquire or forwarding the acquisition by a gateway;
S111, PLC direct connection acquisition, namely establishing communication protocols such as Modbus (TCP/RTU), PROFINET, etherNet/IP and the like between the on-site PLC and an upper computer through an Ethernet or a serial port, and initiating a read request to the PLC by a system according to a preset sampling frequency (for example, once every second or once every ten seconds) to acquire a real-time value in a designated register or a data block;
If some field devices do not support the traditional PLC protocol, the gateway transmits and collects data of the device protocol itself through an industrial edge gateway (such as an OPC UA gateway, a Modbus gateway, a Siemens S7 gateway and the like), then converts the analyzed original information (usually key value pairs or custom binary formats) into a unified data format (such as JSON) and transmits the unified data format to a back-end system;
S12, a transmission protocol and a throwing mode, namely MQTT throwing or HTTP interface throwing;
And S13, the time stamp and the unique identification are processed, namely, in the acquisition end and the gateway, a consistent time standard is used, and a millisecond-level Unix time stamp is generally adopted, namely, the number of milliseconds from 1 month 1 day 1970 to the current moment is counted, so that the data uploaded by different equipment can be ensured to have unified standards in time sequence, and for the unique identification of the equipment, if the equipment does not have a global unique ID, the equipment can be spliced by a series of fixed prefixes (such as gateway numbers) and local numbers of the equipment on the gateway, for example, GW01_PL05 can be used as the global unique number of the equipment. This ensures that no confusion of devices will occur in the subsequent processing and storage.
As a further technical solution of the present invention, the S12 specifically includes:
S121, MQTT uploading, namely, using a lightweight MQTT protocol, a gateway or an acquisition end publishes processed data to a designated theme (Topic) in the form of a message, wherein the theme can be organized in a layered manner according to an enterprise number/Factory number/workshop number/equipment number, for example,/CompanyA/Factory 1/Line2/PLC_001, qoS (quality of service) can be generally selected to be at least one time or at least one time, ensuring that the message is not lost, and repetition can occur, the resource can be more consumed due to the fact that the message is at least one time, a reserved message mark (retain) is generally set to be 'unreserved', so that the existing subscriber receives outdated historical messages, information carried in the reporting process comprises equipment numbers, time stamps, sensor readings and the like, and if the connection is disconnected or the server returns an error, retry times and retry intervals can be set, and an alarm or a local backup is written after multiple times of failed attempts, so as to prevent the data from being lost;
S122, HTTP interface upload, sending collected data to a back-end service through HTTP POST request, interface address example, http:// < back-end domain name >: < port >/api/v1 depicting depictingthe collected data to the back-end service through HTTP POST request
Device/Data, content-Type: application +.
If the request fails due to network problem or back end abnormality, the client will try again according to preset retry strategy (such as retry three times each time for half a second), after all failure, the error information is recorded in local log, and the alarm is triggered to inform the operation and maintenance personnel.
As a further technical solution of the present invention, the S2 specifically includes:
s21, repeating data filtering and discarding invalid data;
s211, filtering repeated data, namely if the same equipment uploads the same time stamp and the same data value to the rear end for a plurality of times in a short time, regarding the same equipment as repeated data, after receiving the data, checking whether the equipment number and the time stamp are completely consistent with the last time or not by the system, and if so, directly discarding the obtained data to avoid redundant storage;
S212, discarding invalid data, wherein the message uploaded by the equipment often contains a status code field for indicating whether the sampling state is normal, if the status code is not normal, the data is invalid and is directly discarded and does not enter a subsequent processing flow, and if the value of key physical quantities such as temperature, pressure and the like obviously exceeds a reasonable range defined by a product specification or service (for example, the temperature is higher than the rated upper limit or lower than the rated lower limit), the data is regarded as abnormal data, and the abnormal data is discarded and recorded in a log;
S22, data missing and interpolation, namely, to ensure continuous time sequence data, sometimes the missing moment in the acquisition process needs to be complemented, the common practice is to check the time span of the same equipment between two adjacent effective samples, if the time interval does not exceed the preset maximum interval (for example, within 5 seconds), estimate the linear relation between the effective values before and after, fill the missing point, if the interval is too large, the interpolation is not performed any more, the last effective value or null value is directly filled instead, so as to avoid error accumulation, and for the simple case, the latest value filling is adopted, the latest effective sampling value is directly used for replacing the missing moment, and if the requirement is more accurate, advanced interpolation algorithms (such as polynomial interpolation, spline interpolation and the like) can be used at the application layer, but the realization cost and the calculation cost are relatively higher, and the latest effective value or null value is rarely used in the general industrial Internet of things scene;
s23, data cleaning and standardization, including unit unification, noise filtering, field renaming and supplementing;
s231, unifying the units, namely converting all physical quantities into standard units, for example, if the temperature unit uploaded by the field device is Fahrenheit, converting the temperature unit into the temperature before entering the next step by the system so as to ensure that the subsequent analysis and storage have consistency;
S232, noise filtering, namely, smoothing data sometimes due to noise interference collected on site, wherein a common method is moving average or median filtering, such as averaging the values of the last N times of sampling to eliminate short-time pulse interference, or taking the median of the N times of sampling to further remove extreme value influence;
S233, field renaming and supplementing, namely, if a downstream system (such as a Kafka producer) requires field names and formats to be fixed, unified renaming of original field names is needed, for example, a 'temp' field is changed into a 'temp' field after cleaning, a 'press' field is changed into a 'press', and if a message lacks certain predefined fields, default values (such as 0 or unknown are set for a number) can be supplemented, and null values can be set according to service requirements;
S24, data encapsulation is JSON, namely after the cleaning is finished, the data is organized into a unified JSON structure, and the JSON structure generally comprises the following parts of device_id, device unique identification;
the system comprises a server, a timer, a data, a status and a metadata, wherein the timer is a millisecond level Unix timestamp when the server receives the data, the data is a key value pair (such as temperature, pressure, humidity and the like) of a sensor reading of a core, the status is a device status code used for indicating whether sampling is normal or not, and the metadata is optional information such as gateway number, signal strength, acquisition end IP and other auxiliary fields;
Example JSON (illustrative only):
{ "device_id": "GW01_PLC05", "timestamp": 171XXXXXXX000,
"data": { "temperature": 42.8, "pressure": 1.05, "humidity": 58.2}, "status": 0, "metadata": { "gateway_id": "GW01", "signal_strength": -65}};
After the fixed format is adopted, when the fixed format is subsequently sent to Kafka or an HTTP interface is called, the data of the JSON structure can be directly used as a message body, and the meaning of each field is kept consistent.
As a further technical solution of the present invention, the S3 specifically includes:
s31, kafka cluster and Topic planning, namely, building at least 3 Kafka broaders at the back end to ensure high availability of the system, and designing Topic names according to business logic, such as' companyA/u
The method comprises the steps of (1) converging data from the same production line or the same type of equipment under the same Topic for centralized processing, configuring proper partition numbers for the Topic according to peak message quantity and system concurrency capacity, wherein the more the partitions are, the stronger downstream parallel consumption capacity is, but the management overhead of a Broker is increased, generally evaluating the peak message rate of the system (such as 10 ten thousand messages per second), and finally determining proper partition numbers of 8, 16, 32 and the like by combining the maximum throughput of each partition;
s32, kafka Producer configuration;
S33, message sending and exception handling, wherein the Producer calls an asynchronous sending interface and monitors a sending result in a callback function, if the sending is successful, a successful log is recorded or ignored, if the sending is failed, the reason of the sending failure (such as network timeout, broker is unavailable and the like) is recorded to the log, and retrying is performed according to a retry strategy;
The throughput optimization is that batch transmission parameters can be set appropriately, for example, the batch transmission is carried out once when a certain number (such as 500 bars) is accumulated or a certain byte number (such as 32 KB) is reached, network overhead caused by frequent small batch transmission is avoided, and meanwhile, the 'linger.ms' can be set to be tens of milliseconds, so that the message arrived in a short time is packed by the Producer as much as possible.
As a further technical solution of the present invention, the S32 specifically includes:
S321: a Broker list, writing the addresses of all the brokers into the configuration of the Producer, e.g. Broker1:9092, broker2:9092, broker3:9092;
s322, a message confirmation mechanism (acks) is recommended to be set as 'all', which means that the message is successfully sent after all In-Sync copies are written, and the data is ensured not to be lost;
The number of retries is that the number of retries is automatically retried when the transmission fails, for example, the number is set to 3, if the transmission still fails after a plurality of retries, an alarm is triggered or a local log is written, so that the data is prevented from being silently lost;
S324, a serializer, wherein a character string is used for serializing when the equipment number is transmitted as a Key, and UTF-8 code is also used for serializing when the JSON character string is used as a Value;
And S325, partitioning and distributing strategies, namely adopting a default hash partitioning strategy, namely performing modulo operation on the total number of partitions after hashing by using equipment numbers, so that all data of the same equipment can be ensured to fall into the same partition, and the time sequence and sequential consumption of the data of the same equipment are ensured.
As a further technical solution of the present invention, the S4 specifically includes:
the system can automatically create a partition of 5 days from '3 days before to' 1 day after before according to the current date, the partition naming mode is PYYYYMMDD, for example, p20250606 represents a partition of 2025, 6 months and 6 days, thus ensuring that the data of the day and the recent days of history have corresponding partitions, the query is also more efficient, the partition retention time is set, for example, 30 days, namely, when a certain partition is created for a plurality of days (for example, 30 days), the system can automatically delete the partition, realize automatic cleaning of expired data, so that the management of storage space is more flexible, and simultaneously perform a partition (Hash partition) design, for example, perform partition according to the device_id, set 16 buckets, thus, when the query and write are performed, different pieces of data can be dispersed to different nodes in parallel, and the performance of the data can be improved;
S42, creating a route Load task, namely executing a special SQL statement on Doris to create the route Load task, designating which Kafka Topic is used for pulling data, what concurrency number is used for pulling the maximum number of bytes or bytes of each batch, and how to analyze JSON, wherein the mapping from the JSON field to the table field is provided by a JSONPaths file, and the correspondence is approximately as follows;
In the routing Load task configuration, the number of concurrent consumers (such as 4 concurrent threads), the maximum number of lines (such as 5000 MB) or the maximum number of bytes (such as 10 MB) of each batch, and a strict mode (strict mode) switch can be specified, and after the strict mode is started, if the fields in a certain JSON are missing or the types are not matched, the dirty data is considered and discarded, and logs are recorded at the same time so as to ensure the data consistency;
the method comprises the steps of S43, carrying out data loading flow and dirty data processing, namely, carrying out parallel pulling of information from a Kafka partition according to a specified concurrency number, wherein the information which accords with a threshold value (number of lines or bytes) is stored in a temporary file when being pulled, carrying out decompression and analysis on the temporary file by a system, extracting fields according to JSONPaths mapping by JSON, and writing the fields into a memory table, automatically distributing memory table data to corresponding partitions according to a partition strategy, and carrying out batch persistence of the data to column storage by a back-end node;
S44, dynamic partition and expiration cleaning, namely, due to the fact that a dynamic partition function is started, doris automatically judges the current date every day, and creates 5 partitions from 3 days before the current date to 1 day after the current date, for example, p20250603, p20250604, p20250605, p20250606 and p20250607 can be created if the current date is 2025, 6 months after the current date, when a certain partition is created for a plurality of days (for example, 30 days after p20250506 expires), the system automatically deletes the partition without manual intervention, so that storage space is saved, and if the production environment needs to keep longer historical data, the parameter of the expiration time of the partition is changed from 30 days to 60 days, 90 days or longer.
As a further technical solution of the present invention, the S5 specifically includes:
s51, end-to-end monitoring indexes comprise acquisition end monitoring, kafka monitoring and Doris monitoring;
s511, monitoring by a collecting end:
The sampling times per second or per minute are counted regularly, and if the sampling times are reduced or interrupted, the PLC connectivity or gateway state needs to be checked;
When the gateway delivers to the back end, if the network condition is bad, retry or packet loss occurs, the ratio of the retry number to the total transmission number is monitored, and when the ratio exceeds a certain threshold (such as 5%), an alarm is sent;
s512, kafka monitoring:
Consumption hysteresis (Consumer Lag), which is the difference between the consumption speed of downstream Consumer (route Load) and the delivery speed of Producer (Producer), is measured by checking the difference between the current write offset and the submitted offset of each partition, and when the hysteresis value increases and continuously exceeds a certain threshold (such as 1 ten thousand pieces), it is indicated that the downstream consumption is not coming, and the concurrent number needs to be expanded or adjusted;
throughput, namely counting the rate (message/second) and byte rate of writing messages to Kafka by a Producer, and confirming whether the maximum bearing capacity of the cluster is reached or not;
Checking whether the information distribution of each partition is balanced or not, and if the load of one partition is far higher than that of other partitions, increasing the partition number or changing the custom partition strategy;
S513, doris monitoring:
The route Load delay is measured by measuring the time difference between the latest data in Kafka and the data written into Doris, and the maximum event time (event_time) of the inserted data in a periodic lookup table is compared with the current system time, if the delay lasts for more than 5 minutes, the downstream writing performance bottleneck possibly occurs, and the problem needs to be checked;
Counting the query number per second or the transaction number per second of each BE node during writing, and considering expansion or optimization parameters when a certain node is in a high-load state (such as CPU usage rate exceeding 80 percent);
The data volume of each partition is monitored, and when the data volume of a certain partition is far more than expected (such as more than 100 GB), the barrel division number is considered to be adjusted or the historical partition is refined;
and S52, performance optimization strategies including Kafka optimization and Doris optimization.
As a further technical solution of the present invention, the S52 specifically includes:
S521 Kafka optimization:
batch sending and compression, namely setting proper batch waiting time (such as tens of milliseconds) at the Producer end, aggregating a plurality of messages arriving in a short time, sending the messages together, and starting a compression algorithm, such as Snappy or LZ4, so that the occupation of network bandwidth can be reduced, and the writing speed of a Broker can be improved;
the number of concurrent consumers is reasonably configured, namely if the number of concurrent of route Load is too small, the consumption speed cannot keep up with the production speed, and if the number of concurrent is too large, the writing task of Doris can contend for resources on a back-end node;
s522, doris optimization:
Column memory coding and compression, namely, aiming at common fields (such as temperature, pressure and other numerical value columns) in a table, adopting a column memory compression coding mode (such as RLE and dictionary compression), greatly reducing memory occupation and improving scanning efficiency;
Small file merging, namely if a large number of small files are generated due to frequent writing, the query speed is influenced, and the partition file merging operation can be executed regularly, so that the small files are automatically merged into a large file;
and (3) maintaining statistical information, namely periodically performing analysis operation on the table, and updating the column histogram statistical information so that a query optimizer can select a better execution plan.
As a further technical solution of the present invention, the step S6 specifically includes:
S61, data warehouse butt joint, namely constructing a wide table into a real table in Doris, designing a dimension table according to service requirements, synchronizing data to a downstream data warehouse (such as Hive/Hadoop, clickHouse, presto and the like) through ETL or timing tasks to perform offline report and multidimensional analysis if more complex online analysis processing (OLAP) is needed;
S62, calculating and alarming in real time, namely, using a flow processing engine APACHE FLINK and SPARK STREAMING between Kafka and Dori or in parallel to Doris to directly consume the preprocessed data and then process complex events (such as anomaly detection and trend judgment), and when indexes such as temperature, pressure and the like are found to exceed a preset threshold value, triggering a short message or mail alarm immediately;
The machine learning and deep learning application comprises the steps of performing feature engineering on historical time sequence data stored in Doris according to a time window (such as day, week and month), aggregating temperature, pressure, vibration and other data into statistical features (maximum value, minimum value, average value, variance and the like), constructing a feature table required by model training, training a time sequence prediction model (such as LSTM or a model based on a transducer) by using a Python frame (such as TensorFlow, pyTorch), predicting the health state or the failure possibility of equipment in a short period, taking data of the past N days and N moments as input during training, outputting predicted values of future time, deploying the data as online reasoning service after the model training is completed and the effect is verified, periodically acquiring the latest time sequence data from the Doris, comparing the latest time sequence data with the current threshold after the predicted result is generated, and early warning potential failure.
The beneficial effects of the invention are as follows:
1. The method has the advantages of remarkably reducing data processing delay, improving instantaneity, reducing end-to-end processing delay from minute level to less than 30 seconds by throwing Kafka to gateway in real time and then consuming by Doris in parallel, enabling the peak time to be no more than 1 minute, enabling the success rate of data delivery to reach 99.9% under a high concurrency scene, effectively avoiding a data empty window period, realizing near real-time monitoring and early warning, and solving the problems that the traditional industrial system is dependent on batch import, 5-10 minutes are often required for acquisition from site to inquire, and data packet loss or backlog is easy to occur when buffering is lacked.
2. The method improves the quality and consistency of data, reduces the defects and anomalies, namely, the repetition uploading rate is reduced to less than 0.1% after the repetition uploading rate is finished before the upper polishing, the interpolation success rate of a missing point is more than 95%, the missing value is occupied only for a missing period exceeding 5 seconds, the dirty data writing rate is lower than 0.5% in a back-end strict mode, the integral data integrity and consistency are obviously superior to those of the traditional scheme, and the problems that a direct writing library mode in the prior art lacks a mechanism of repetition removing, interpolation and anomaly interception, the common repeated recording rate is as high as 2%, the missing data and noise interference is serious, and the follow-up manual cleaning and the retrieval are required are solved.
3. The automatic partitioning and storage cleaning reduces the operation and maintenance cost, improves the expandability, automatically maintains the partitioning from the current day to the previous day to the next day by utilizing the dynamic partitioning function of Doris, automatically deletes the partitioning after setting for 30 days without manual intervention, stably controls the data volume of a single partition to BE 50-80GB in actual operation, keeps the stability of query and writing performance, only expands Kafka Broker and Doris BE nodes when the number of devices is increased, reduces the operation and maintenance partitioning management workload by about 90%, and solves the problems that the traditional database partitioning needs manual addition, archiving and deleting, and the single partition size can exceed hundred GB if the partitioning management is missed along with the large workload of data accumulation, thereby causing the performance bottleneck.
Drawings
Fig. 1 is a flowchart of a method for storing Doris in equipment data based on industrial internet according to the present invention.
Detailed Description
The invention is further described in connection with the following detailed description, in order to make the technical means, the creation characteristics, the achievement of the purpose and the effect of the invention easy to understand.
Referring to fig. 1, a method for storing Doris based on equipment data of industrial internet includes the following specific steps:
S1, acquiring industrial equipment node data, namely acquiring the data of a field PLC or gateway equipment in real time, and preparing for subsequent cleaning and storage;
S11, selecting an acquisition mode, namely directly connecting the PLC to acquire or forwarding the acquisition by a gateway;
S111, PLC direct connection acquisition, namely establishing communication protocols such as Modbus (TCP/RTU), PROFINET, etherNet/IP and the like between the on-site PLC and an upper computer through an Ethernet or a serial port, and initiating a read request to the PLC by a system according to a preset sampling frequency (for example, once every second or once every ten seconds) to acquire a real-time value in a designated register or a data block;
If some field devices do not support the traditional PLC protocol, the gateway transmits and collects data of the device protocol itself through an industrial edge gateway (such as an OPC UA gateway, a Modbus gateway, a Siemens S7 gateway and the like), then converts the analyzed original information (usually key value pairs or custom binary formats) into a unified data format (such as JSON) and transmits the unified data format to a back-end system;
S12, a transmission protocol and a throwing mode, namely MQTT throwing or HTTP interface throwing;
S121, MQTT uploading, namely, using a lightweight MQTT protocol, a gateway or an acquisition end publishes processed data to a designated theme (Topic) in the form of a message, wherein the theme can be organized in a layered manner according to an enterprise number/Factory number/workshop number/equipment number, for example,/CompanyA/Factory 1/Line2/PLC_001, qoS (quality of service) can be generally selected to be at least one time or at least one time, ensuring that the message is not lost, and repetition can occur, the resource can be more consumed due to the fact that the message is at least one time, a reserved message mark (retain) is generally set to be 'unreserved', so that the existing subscriber receives outdated historical messages, information carried in the reporting process comprises equipment numbers, time stamps, sensor readings and the like, and if the connection is disconnected or the server returns an error, retry times and retry intervals can be set, and an alarm or a local backup is written after multiple times of failed attempts, so as to prevent the data from being lost;
S122, HTTP interface upload, sending collected data to a back-end service through HTTP POST request, interface address example, http:// < back-end domain name >: < port >/api/v1 depicting depictingthe collected data to the back-end service through HTTP POST request
Device/Data, content-Type: application +.
If the request fails due to network problems or back-end abnormality, the client retries according to a preset retry strategy (such as retries for three times at half a second intervals), and after all the retries, the client records error information to a local log and triggers an alarm to inform operation and maintenance personnel;
And S13, the time stamp and the unique identification are processed, namely, in the acquisition end and the gateway, a consistent time standard is used, and a millisecond-level Unix time stamp is generally adopted, namely, the number of milliseconds from 1 month 1 day 1970 to the current moment is counted, so that the data uploaded by different equipment can be ensured to have unified standards in time sequence, and for the unique identification of the equipment, if the equipment does not have a global unique ID, the equipment can be spliced by a series of fixed prefixes (such as gateway numbers) and local numbers of the equipment on the gateway, for example, GW01_PL05 can be used as the global unique number of the equipment. This ensures that no equipment confusion will occur during subsequent processing and storage;
S2, preprocessing the acquired data, namely performing de-duplication, alignment, denoising and standardization on the originally acquired data, and ensuring the quality and consistency of the data in subsequent warehouse entry;
s21, repeating data filtering and discarding invalid data;
s211, filtering repeated data, namely if the same equipment uploads the same time stamp and the same data value to the rear end for a plurality of times in a short time, regarding the same equipment as repeated data, after receiving the data, checking whether the equipment number and the time stamp are completely consistent with the last time or not by the system, and if so, directly discarding the obtained data to avoid redundant storage;
S212, discarding invalid data, wherein the message uploaded by the equipment often contains a status code field for indicating whether the sampling state is normal, if the status code is not normal, the data is invalid and is directly discarded and does not enter a subsequent processing flow, and if the value of key physical quantities such as temperature, pressure and the like obviously exceeds a reasonable range defined by a product specification or service (for example, the temperature is higher than the rated upper limit or lower than the rated lower limit), the data is regarded as abnormal data, and the abnormal data is discarded and recorded in a log;
S22, data missing and interpolation, namely, to ensure continuous time sequence data, sometimes the missing moment in the acquisition process needs to be complemented, the common practice is to check the time span of the same equipment between two adjacent effective samples, if the time interval does not exceed the preset maximum interval (for example, within 5 seconds), estimate the linear relation between the effective values before and after, fill the missing point, if the interval is too large, the interpolation is not performed any more, the last effective value or null value is directly filled instead, so as to avoid error accumulation, and for the simple case, the latest value filling is adopted, the latest effective sampling value is directly used for replacing the missing moment, and if the requirement is more accurate, advanced interpolation algorithms (such as polynomial interpolation, spline interpolation and the like) can be used at the application layer, but the realization cost and the calculation cost are relatively higher, and the latest effective value or null value is rarely used in the general industrial Internet of things scene;
s23, data cleaning and standardization, including unit unification, noise filtering, field renaming and supplementing;
s231, unifying the units, namely converting all physical quantities into standard units, for example, if the temperature unit uploaded by the field device is Fahrenheit, converting the temperature unit into the temperature before entering the next step by the system so as to ensure that the subsequent analysis and storage have consistency;
S232, noise filtering, namely, smoothing data sometimes due to noise interference collected on site, wherein a common method is moving average or median filtering, such as averaging the values of the last N times of sampling to eliminate short-time pulse interference, or taking the median of the N times of sampling to further remove extreme value influence;
S233, field renaming and supplementing, namely, if a downstream system (such as a Kafka producer) requires field names and formats to be fixed, unified renaming of original field names is needed, for example, a 'temp' field is changed into a 'temp' field after cleaning, a 'press' field is changed into a 'press', and if a message lacks certain predefined fields, default values (such as 0 or unknown are set for a number) can be supplemented, and null values can be set according to service requirements;
S24, data encapsulation is JSON, namely after the cleaning is finished, the data is organized into a unified JSON structure, and the JSON structure generally comprises the following parts of device_id, device unique identification;
the system comprises a server, a timer, a data, a status and a metadata, wherein the timer is a millisecond level Unix timestamp when the server receives the data, the data is a key value pair (such as temperature, pressure, humidity and the like) of a sensor reading of a core, the status is a device status code used for indicating whether sampling is normal or not, and the metadata is optional information such as gateway number, signal strength, acquisition end IP and other auxiliary fields;
Example JSON (illustrative only):
{ "device_id": "GW01_PLC05", "timestamp": 171XXXXXXX000,
"data": { "temperature": 42.8, "pressure": 1.05, "humidity": 58.2}, "status": 0, "metadata": { "gateway_id": "GW01", "signal_strength": -65}};
After the fixed format is adopted, when the fixed format is subsequently sent to Kafka or an HTTP interface is called, the data of the JSON structure can be directly used as a message body, and the meaning of each field is kept consistent;
S3, the preprocessed data is thrown up to Kafka, data buffering and decoupling are achieved through a distributed message queue, and downstream parallel consumption and good expansibility are achieved;
s31, kafka cluster and Topic planning, namely, building at least 3 Kafka broaders at the back end to ensure high availability of the system, and designing Topic names according to business logic, such as' companyA/u
The method comprises the steps of (1) converging data from the same production line or the same type of equipment under the same Topic for centralized processing, configuring proper partition numbers for the Topic according to peak message quantity and system concurrency capacity, wherein the more the partitions are, the stronger downstream parallel consumption capacity is, but the management overhead of a Broker is increased, generally evaluating the peak message rate of the system (such as 10 ten thousand messages per second), and finally determining proper partition numbers of 8, 16, 32 and the like by combining the maximum throughput of each partition;
s32, kafka Producer configuration;
S321: a Broker list, writing the addresses of all the brokers into the configuration of the Producer, e.g. Broker1:9092, broker2:9092, broker3:9092;
s322, a message confirmation mechanism (acks) is recommended to be set as 'all', which means that the message is successfully sent after all In-Sync copies are written, and the data is ensured not to be lost;
The number of retries is that the number of retries is automatically retried when the transmission fails, for example, the number is set to 3, if the transmission still fails after a plurality of retries, an alarm is triggered or a local log is written, so that the data is prevented from being silently lost;
S324, a serializer, wherein a character string is used for serializing when the equipment number is transmitted as a Key, and UTF-8 code is also used for serializing when the JSON character string is used as a Value;
s325, adopting a partition allocation strategy, namely adopting a default hash partition strategy, namely taking a module of the total number of partitions after hashing by using equipment numbers, so that all data of the same equipment can be ensured to fall into the same partition, and the time sequence and sequential consumption of the data of the same equipment are ensured;
S33, message sending and exception handling, wherein the Producer calls an asynchronous sending interface and monitors a sending result in a callback function, if the sending is successful, a successful log is recorded or ignored, if the sending is failed, the reason of the sending failure (such as network timeout, broker is unavailable and the like) is recorded to the log, and retrying is performed according to a retry strategy;
The throughput optimization, namely, batch transmission parameters can be properly set, for example, the batch transmission is carried out once when a certain number (such as 500 bars) is accumulated or a certain byte number (such as 32 KB) is reached, so that network overhead caused by frequent small batch transmission is avoided;
S4, doris consumes the Kafka data and stores the data, namely, doris pulls the JSON data from the Kafka in real time and writes the JSON data into a relational table, and meanwhile, automatic partitioning and expiration cleaning of the data are realized by means of dynamic partitioning;
the system can automatically create a partition of 5 days from '3 days before to' 1 day after before according to the current date, the partition naming mode is PYYYYMMDD, for example, p20250606 represents a partition of 2025, 6 months and 6 days, thus ensuring that the data of the day and the recent days of history have corresponding partitions, the query is also more efficient, the partition retention time is set, for example, 30 days, namely, when a certain partition is created for a plurality of days (for example, 30 days), the system can automatically delete the partition, realize automatic cleaning of expired data, so that the management of storage space is more flexible, and simultaneously perform a partition (Hash partition) design, for example, perform partition according to the device_id, set 16 buckets, thus, when the query and write are performed, different pieces of data can be dispersed to different nodes in parallel, and the performance of the data can be improved;
S42, creating a route Load task, namely executing a special SQL statement on Doris to create the route Load task, designating which Kafka Topic is used for pulling data, what concurrency number is used for pulling the maximum number of bytes or bytes of each batch, and how to analyze JSON, wherein the mapping from the JSON field to the table field is provided by a JSONPaths file, and the correspondence is approximately as follows;
In the routing Load task configuration, the number of concurrent consumers (such as 4 concurrent threads), the maximum number of lines (such as 5000 MB) or the maximum number of bytes (such as 10 MB) of each batch, and a strict mode (strict mode) switch can be specified, and after the strict mode is started, if the fields in a certain JSON are missing or the types are not matched, the dirty data is considered and discarded, and logs are recorded at the same time so as to ensure the data consistency;
the method comprises the steps of S43, carrying out data loading flow and dirty data processing, namely, carrying out parallel pulling of information from a Kafka partition according to a specified concurrency number, wherein the information which accords with a threshold value (number of lines or bytes) is stored in a temporary file when being pulled, carrying out decompression and analysis on the temporary file by a system, extracting fields according to JSONPaths mapping by JSON, and writing the fields into a memory table, automatically distributing memory table data to corresponding partitions according to a partition strategy, and carrying out batch persistence of the data to column storage by a back-end node;
S44, dynamic partition and expiration cleaning, namely, automatically judging the current date every day by Doris due to the starting of a dynamic partition function, creating 5 partitions from 3 days before the current date to 1 day after the current date, for example, if the current date is 2025, 6 months and 6 days after the current date, p20250603, p20250604, p20250605, p20250606 and p20250607 are created, when a certain partition is created for a plurality of days (for example, 30 days after p20250506 expires), automatically deleting the partition by a system without manual intervention, thereby saving storage space, and if the production environment needs to keep longer historical data, only changing the parameter of the expiration time of the partition from 30 days to 60 days, 90 days or longer;
S5, monitoring and performance optimization, namely monitoring an end-to-end link from acquisition to storage, finding out bottlenecks and anomalies in time, and performing targeted optimization;
s51, end-to-end monitoring indexes comprise acquisition end monitoring, kafka monitoring and Doris monitoring;
s511, monitoring by a collecting end:
The sampling times per second or per minute are counted regularly, and if the sampling times are reduced or interrupted, the PLC connectivity or gateway state needs to be checked;
When the gateway delivers to the back end, if the network condition is bad, retry or packet loss occurs, the ratio of the retry number to the total transmission number is monitored, and when the ratio exceeds a certain threshold (such as 5%), an alarm is sent;
s512, kafka monitoring:
Consumption hysteresis (Consumer Lag), which is the difference between the consumption speed of downstream Consumer (route Load) and the delivery speed of Producer (Producer), is measured by checking the difference between the current write offset and the submitted offset of each partition, and when the hysteresis value increases and continuously exceeds a certain threshold (such as 1 ten thousand pieces), it is indicated that the downstream consumption is not coming, and the concurrent number needs to be expanded or adjusted;
throughput, namely counting the rate (message/second) and byte rate of writing messages to Kafka by a Producer, and confirming whether the maximum bearing capacity of the cluster is reached or not;
Checking whether the information distribution of each partition is balanced or not, and if the load of one partition is far higher than that of other partitions, increasing the partition number or changing the custom partition strategy;
S513, doris monitoring:
The route Load delay is measured by measuring the time difference between the latest data in Kafka and the data written into Doris, and the maximum event time (event_time) of the inserted data in a periodic lookup table is compared with the current system time, if the delay lasts for more than 5 minutes, the downstream writing performance bottleneck possibly occurs, and the problem needs to be checked;
Counting the query number per second or the transaction number per second of each BE node during writing, and considering expansion or optimization parameters when a certain node is in a high-load state (such as CPU usage rate exceeding 80 percent);
The data volume of each partition is monitored, and when the data volume of a certain partition is far more than expected (such as more than 100 GB), the barrel division number is considered to be adjusted or the historical partition is refined;
s52, performance optimization strategies including Kafka optimization and Doris optimization;
S521 Kafka optimization:
batch sending and compression, namely setting proper batch waiting time (such as tens of milliseconds) at the Producer end, aggregating a plurality of messages arriving in a short time, sending the messages together, and starting a compression algorithm, such as Snappy or LZ4, so that the occupation of network bandwidth can be reduced, and the writing speed of a Broker can be improved;
the number of concurrent consumers is reasonably configured, namely if the number of concurrent of route Load is too small, the consumption speed cannot keep up with the production speed, and if the number of concurrent is too large, the writing task of Doris can contend for resources on a back-end node;
s522, doris optimization:
Column memory coding and compression, namely, aiming at common fields (such as temperature, pressure and other numerical value columns) in a table, adopting a column memory compression coding mode (such as RLE and dictionary compression), greatly reducing memory occupation and improving scanning efficiency;
Small file merging, namely if a large number of small files are generated due to frequent writing, the query speed is influenced, and the partition file merging operation can be executed regularly, so that the small files are automatically merged into a large file;
The statistical information maintenance, which is to periodically execute analysis operation on the table, update the column histogram statistical information and enable the query optimizer to select a better execution plan;
s6, expanding secondary utilization and downstream analysis, and supporting subsequent data warehouse butt joint, real-time calculation and machine learning application on the premise of ensuring data quality and real-time performance;
S61, data warehouse butt joint, namely constructing a wide table into a real table in Doris, designing a dimension table according to service requirements, synchronizing data to a downstream data warehouse (such as Hive/Hadoop, clickHouse, presto and the like) through ETL or timing tasks to perform offline report and multidimensional analysis if more complex online analysis processing (OLAP) is needed;
S62, calculating and alarming in real time, namely, using a flow processing engine APACHE FLINK and SPARK STREAMING between Kafka and Dori or in parallel to Doris to directly consume the preprocessed data and then process complex events (such as anomaly detection and trend judgment), and when indexes such as temperature, pressure and the like are found to exceed a preset threshold value, triggering a short message or mail alarm immediately;
The machine learning and deep learning application comprises the steps of performing feature engineering on historical time sequence data stored in Doris according to a time window (such as day, week and month), aggregating temperature, pressure, vibration and other data into statistical features (maximum value, minimum value, average value, variance and the like), constructing a feature table required by model training, training a time sequence prediction model (such as LSTM or a model based on a transducer) by using a Python frame (such as TensorFlow, pyTorch), predicting the health state or the failure possibility of equipment in a short period, taking data of the past N days and N moments as input during training, outputting predicted values of future time, deploying the data as online reasoning service after the model training is completed and the effect is verified, periodically acquiring the latest time sequence data from the Doris, comparing the latest time sequence data with the current threshold after the predicted result is generated, and early warning potential failure.
From the description, the embodiment of the invention has the advantages that the data processing delay is obviously reduced, the real-time performance is improved, the data processing delay is timely thrown to Kafka through a gateway and is parallelly consumed by Doris, the end-to-end processing delay is reduced to less than 30 seconds from a minute level, the peak time is not more than 1 minute, the success rate of data delivery can reach 99.9% under a high concurrency scene, the 'data empty window period' is effectively avoided, the near real-time monitoring and early warning are realized, the problem that the traditional industrial system is mostly dependent on batch import, 5-10 minutes are often required for acquisition from the site, and the data packet loss or backlog easily occurs when the buffering is lacked is solved.
The method improves the quality and consistency of data, reduces the defects and anomalies, namely, the repetition uploading rate is reduced to less than 0.1% after the repetition uploading rate is finished before the upper polishing, the interpolation success rate of a missing point is more than 95%, the missing value is occupied only for a missing period exceeding 5 seconds, the dirty data writing rate is lower than 0.5% in a back-end strict mode, the integral data integrity and consistency are obviously superior to those of the traditional scheme, and the problems that a direct writing library mode in the prior art lacks a mechanism of repetition removing, interpolation and anomaly interception, the common repeated recording rate is as high as 2%, the missing data and noise interference is serious, and the follow-up manual cleaning and the retrieval are required are solved.
The automatic partitioning and storage cleaning reduces the operation and maintenance cost, improves the expandability, automatically maintains the partitioning from the current day to the previous day to the next day by utilizing the dynamic partitioning function of Doris, automatically deletes the partitioning after setting for 30 days without manual intervention, stably controls the data volume of a single partition to BE 50-80GB in actual operation, keeps the stability of query and writing performance, only expands Kafka Broker and Doris BE nodes when the number of devices is increased, reduces the operation and maintenance partitioning management workload by about 90%, and solves the problems that the traditional database partitioning needs manual addition, archiving and deleting, and the single partition size can exceed hundred GB if the partitioning management is missed along with the large workload of data accumulation, thereby causing the performance bottleneck.
It will be appreciated by persons skilled in the art that the above discussion of any embodiment is merely exemplary and is not intended to imply that the scope of the invention is limited to these examples, that combinations of technical features in the above embodiments or in different embodiments may also be implemented in any order within the spirit of the invention, and that many other variations of the different aspects of the invention as described above exist, which are not provided in detail for the sake of brevity.
The present invention is intended to embrace all such alternatives, modifications and variances which fall within the broad scope of the specification. Therefore, any omission, modification, equivalent replacement, improvement, etc. of the present invention should be included in the scope of the present invention.

Claims (7)

1.一种基于工业互联网的设备数据存储Doris方法,其特征在于,包括以下具体步骤:1. A Doris method for storing device data based on the Industrial Internet, characterized by comprising the following specific steps: S1:获取工业设备节点数据:实时采集现场PLC或网关设备的数据,为后续清洗和存储做准备;S1: Acquire industrial equipment node data: collect data from on-site PLC or gateway devices in real time to prepare for subsequent cleaning and storage; S2:对获取的数据进行预处理:对原始采集的数据进行去重、补齐、去噪与标准化,确保后续入库的数据质量和一致性;S2: Preprocess the acquired data: remove duplicates, complete, remove noise, and standardize the original collected data to ensure the quality and consistency of the data subsequently stored; S3:将预处理后的数据上抛至Kafka:通过分布式消息队列实现数据缓冲和解耦,下游可并行消费、扩展性好;S3: Uploads pre-processed data to Kafka. This uses distributed message queues to achieve data buffering and decoupling, enabling parallel downstream consumption and good scalability. S4:Doris消费Kafka数据并存储:让Doris实时从Kafka拉取JSON数据并写入到关系型表中,同时借助动态分区实现数据的自动化分区与过期清理;S4: Doris consumes and stores Kafka data: Doris pulls JSON data from Kafka in real time and writes it to a relational table. It also uses dynamic partitioning to automate data partitioning and expiration cleanup. S41:Doris表结构与动态分区设计:在Doris中建一张宽表,该表采用按日期范围划分分区的方式,并启用动态分区功能;系统会根据当前日期自动创建从“当前往前3天”到“当前往后1天”共5天的分区;设置分区保留时长,当某个分区创建满若干天后,系统会自动将该分区删除;同时对表进行分桶设计;S41: Doris table structure and dynamic partition design: Create a wide table in Doris that is partitioned by date range and has dynamic partitioning enabled. The system will automatically create 5 partitions, from "3 days before the current date" to "1 day after the current date," based on the current date. Set the partition retention period. When a partition is created for a certain number of days, the system will automatically delete it. At the same time, perform bucketing on the table. S42:创建Routine Load任务:在Doris上执行专门的SQL语句来创建Routine Load任务,指定该任务从哪个Kafka Topic拉取数据、用什么并发数、每批最大拉取多少条或多少字节,以及如何解析 JSON;JSON字段到表字段的映射由一个JSONPaths文件提供;RoutineLoad任务配置中,可指定并发消费者数、每个批次最大的行数或最大字节数以及严格模式开关;S42: Create a Routine Load task: Execute a special SQL statement on Doris to create a Routine Load task, specifying which Kafka topic the task pulls data from, the concurrency number to use, the maximum number of records or bytes to pull per batch, and how to parse JSON; the mapping of JSON fields to table fields is provided by a JSONPaths file; in the RoutineLoad task configuration, you can specify the number of concurrent consumers, the maximum number of rows or bytes per batch, and the strict mode switch; S43:数据加载流程与脏数据处理:Doris Routine Load按照指定并发数并行从Kafka分区中拉取消息,每拉取到符合阈值的消息,就会存到临时文件中;系统对临时文件进行解压与解析,将JSON按照JSONPaths映射抽取出字段并写入到内存表;内存表数据按分区策略自动分配到对应分区,再由后端节点批量将数据持久化到列存储;对于解析失败、字段类型不匹配或缺少必需字段的记录,Routine Load会将该条记录归入“脏数据”队列,并按照“脏数据最大比例”阈值决定是忽略还是告警,若某批次脏数据比例超过一定数值,会触发系统告警,人工检查JSON格式或JSONPaths映射是否有误;S43: Data loading process and dirty data processing: Doris Routine Load pulls messages from Kafka partitions in parallel according to the specified concurrency. Each message that meets the threshold is stored in a temporary file. The system decompresses and parses the temporary file, extracts the fields from the JSON according to the JSONPaths mapping, and writes them to the memory table. The memory table data is automatically allocated to the corresponding partition according to the partitioning strategy, and then the backend node persists the data in batches to the column storage. For records that fail to parse, have mismatched field types, or are missing required fields, Routine Load will place the record in the "dirty data" queue and decide whether to ignore or issue an alarm based on the "maximum dirty data ratio" threshold. If the dirty data ratio of a batch exceeds a certain value, a system alarm will be triggered, and manual inspection will be conducted to see if there are errors in the JSON format or JSONPaths mapping. S44:动态分区与过期清理:由于开启了动态分区功能,Doris会每日自动判断当前日期,并创建从“当前往前3天”到“当前往后1天”的5个分区;当某个分区创建满若干天后,系统会自动将该分区删除;S44: Dynamic partitioning and expired cleanup: Since the dynamic partitioning function is turned on, Doris will automatically determine the current date every day and create 5 partitions from "3 days before the current date" to "1 day after the current date"; when a partition is created for a certain number of days, the system will automatically delete the partition; S5:监控与性能优化:监控从采集到存储的端到端链路,及时发现瓶颈与异常,并进行针对性优化;S5: Monitoring and performance optimization: Monitor the end-to-end link from data collection to storage, promptly identify bottlenecks and anomalies, and perform targeted optimization. S51:端到端监控指标:包括采集端监控、Kafka监控和Doris监控;S51: End-to-end monitoring indicators: including collection end monitoring, Kafka monitoring and Doris monitoring; S52:性能优化策略:包括Kafka优化和Doris优化;S52: Performance optimization strategy: including Kafka optimization and Doris optimization; S6:扩展:二次利用与下游分析,在保证数据质量与实时性的前提下,支持后续的数据仓库对接、实时计算与机器学习应用;S6: Extension: Secondary utilization and downstream analysis, while ensuring data quality and real-time performance, support subsequent data warehouse docking, real-time computing and machine learning applications; S61:数据仓库对接:在Doris中将宽表构造成事实表,再根据业务需要设计维度表,若需进行更复杂的联机分析处理,可通过ETL或定时任务,将数据同步到下游数据仓库进行离线报表和多维分析;在同一个Doris 集群中,也可结合视图或分层表结构,将不同粒度的数据及时提供给BI工具;S61: Data Warehouse Interconnection: In Doris, wide tables are constructed into fact tables, and dimension tables are designed according to business needs. If more complex online analytical processing is required, data can be synchronized to downstream data warehouses through ETL or scheduled tasks for offline reporting and multidimensional analysis. In the same Doris cluster, views or hierarchical table structures can also be combined to provide data of different granularities to BI tools in a timely manner. S62:实时计算与告警:在Kafka与Dori之间,或并行于Doris 之外,使用Apache Flink、Spark Streaming的流式处理引擎,将预处理后的数据直接消费后进行复杂事件处理,当发现温度、压力等指标超过预设阈值时,立刻触发短信或邮件告警;基于CEP模式,可设定多个连续状态或模式识别;S62: Real-time computing and alerting: Using stream processing engines like Apache Flink and Spark Streaming between Kafka and Dori, or in parallel with Doris, pre-processed data is directly consumed and then subjected to complex event processing. When indicators such as temperature and pressure exceed preset thresholds, SMS or email alerts are immediately triggered. Based on the CEP model, multiple continuous states or pattern recognition can be set. S63:机器学习与深度学习应用:对存储在Doris中的历史时序数据,根据时间窗口进行特征工程,将数据聚合成统计特征,构建模型训练所需的特征表;使用Python框架训练时序预测模型,预测短期内的设备健康状态或故障可能性,训练时可将过去N天、N 个时刻的数据作为输入,输出未来一段时间的预估值;在模型训练完成并验证效果后,将其部署为在线推理服务,定期从Doris中获取最新时序数据,生成预测结果后与当前阈值做对比,提前预警潜在故障。S63: Machine learning and deep learning applications: For the historical time series data stored in Doris, feature engineering is performed according to the time window, the data is aggregated into statistical features, and the feature table required for model training is constructed; the Python framework is used to train the time series prediction model to predict the health status or failure possibility of the equipment in the short term. During training, the data of the past N days and N moments can be used as input to output the estimated value for a period of time in the future; after the model training is completed and the effect is verified, it is deployed as an online inference service, regularly obtaining the latest time series data from Doris, generating prediction results and comparing them with the current threshold to provide early warning of potential failures. 2.根据权利要求1所述的一种基于工业互联网的设备数据存储Doris方法,其特征在于,所述S1具体包括:2. The Doris method for storing device data based on the Industrial Internet according to claim 1, wherein S1 specifically includes: S11:采集方式选择:PLC直连采集或网关转发采集;S11: Collection method selection: PLC direct connection collection or gateway forwarding collection; S12:传输协议与上抛方式:MQTT上抛或HTTP 接口上抛;S12: Transmission protocol and upload method: MQTT upload or HTTP interface upload; S13:时间戳与唯一标识处理:在采集端与网关中,都要使用一致的时间基准,一般采用毫秒级的Unix时间戳;对于设备的唯一标识,如果设备本身没有全局唯一ID,可在网关上由一串固定前缀与设备局部编号拼接而成。S13: Timestamp and unique identification processing: A consistent time base should be used at the collection end and the gateway, generally using millisecond-level Unix timestamps; for the unique identification of the device, if the device itself does not have a globally unique ID, it can be composed of a fixed prefix and the device's local number on the gateway. 3.根据权利要求2所述的一种基于工业互联网的设备数据存储Doris方法,其特征在于,所述S12具体包括:3. The Doris method for storing device data based on the Industrial Internet according to claim 2, wherein S12 specifically includes: S121:MQTT上抛:使用轻量级的MQTT协议,网关或采集端将处理后的数据以消息的形式发布到指定的主题中,主题可按照“企业编号/工厂编号/车间编号/设备编号”进行分层组织;QoS通常可选“至少一次”或“精确一次”两个级别;保留消息标志一般设置为“不保留”;若连接断开或服务器返回错误,可设置重试次数和重试间隔,并在多次尝试失败后报警或写入本地备份;S121: MQTT upload: Using the lightweight MQTT protocol, the gateway or acquisition end publishes processed data in the form of messages to designated topics. Topics can be hierarchically organized according to "enterprise number/factory number/workshop number/equipment number." QoS typically offers two levels: "at least once" or "exactly once." The message retention flag is typically set to "do not retain." If the connection is disconnected or the server returns an error, the number of retries and retry interval can be set. After multiple failed attempts, an alarm will be generated or the message will be written to a local backup. S122:HTTP接口上抛:通过HTTP POST请求将采集到的数据发送到后端服务;在HTTP请求头中指定Content-Type;若因网络问题或后端异常导致请求失败,客户端会按照预先设定的重试策略再次尝试;全部失败后,将错误信息记录到本地日志,并触发告警通知运维人员。S122: HTTP interface upload: The collected data is sent to the backend service via HTTP POST request; the Content-Type is specified in the HTTP request header; if the request fails due to network problems or backend exceptions, the client will try again according to the pre-set retry strategy; if all attempts fail, the error information is recorded in the local log and an alarm is triggered to notify the operation and maintenance personnel. 4.根据权利要求1所述的一种基于工业互联网的设备数据存储Doris方法,其特征在于,所述S2具体包括:4. The Doris method for storing device data based on the Industrial Internet according to claim 1, wherein S2 specifically includes: S21:重复数据过滤与丢弃无效数据;S21: Duplicate data filtering and discarding invalid data; S22:数据缺失补齐与插值:查看同一设备在相邻两个有效采样之间的时间跨度,如果时间间隔不超过预先设定的最大间隔,就用前后两次有效值之间的线性关系进行估算,填补缺失点;如果间隔过大,就不再进行插值,改为直接填充上一条有效值或空值,以避免错误累积;对于简单情况,采用“最近值填充”,直接使用距丢失时刻最近的一次有效采样值来代替;若要求更准确,也可在应用层使用高级插值算法;S22: Data Missing Filling and Interpolation: Check the time span between two adjacent valid samples of the same device. If the time interval does not exceed the preset maximum interval, the linear relationship between the two valid values is used to estimate and fill the missing point. If the interval is too large, no interpolation is performed and the previous valid value or null value is directly filled to avoid error accumulation. For simple cases, "nearest value filling" is used to directly replace the valid sample value closest to the missing moment. If higher accuracy is required, advanced interpolation algorithms can also be used at the application layer. S23:数据清洗与标准化:包括单位统一、噪声滤波和字段重命名与补充;S23: Data cleaning and standardization: including unit unification, noise filtering, and field renaming and supplementation; S24:数据封装为JSON:在完成上述清洗之后,将数据组织成统一的JSON结构,通常包含以下几个部分:device_id、Timestamp、status、metadata;采用这种固定格式后,后续发送到Kafka或者调用HTTP接口时,可直接将JSON字符串作为消息主体,保持各字段含义一致。S24: Data encapsulation as JSON: After completing the above cleaning, organize the data into a unified JSON structure, which usually contains the following parts: device_id, timestamp, status, metadata; after adopting this fixed format, when sending to Kafka or calling the HTTP interface, the JSON string can be directly used as the message body to keep the meaning of each field consistent. 5.根据权利要求1所述的一种基于工业互联网的设备数据存储Doris方法,其特征在于,所述S3具体包括:5. The Doris method for storing device data based on the Industrial Internet according to claim 1, wherein S3 specifically includes: S31:Kafka集群与Topic规划:在后端搭建至少3台Kafka Broker,按照业务逻辑设计Topic名称,将来自同一产线或同一类型设备的数据汇聚在同一个Topic下;根据峰值消息量和系统并发能力,为该Topic配置合适的分区数;设置副本因子为3,每条消息会在3台Broker上都有一份副本;S31: Kafka Cluster and Topic Planning: Build at least three Kafka brokers on the backend, design topic names based on business logic, and aggregate data from the same production line or the same type of equipment under the same topic. Configure an appropriate number of partitions for the topic based on peak message volume and system concurrency. Set the replication factor to 3, so that each message will have a replica on all three brokers. S32:Kafka Producer配置;S32: Kafka Producer configuration; S33:消息发送与异常处理:Producer调用异步发送接口,在回调函数中监听发送结果:如果发送成功,就记录成功日志或忽略;如果发送失败,将发送失败的原因记录到日志,并按重试策略进行再次尝试;若重试多次仍失败,则将该条消息放入本地临时缓冲队列,待后台恢复或人工介入后再次尝试投递。S33: Message sending and exception handling: Producer calls the asynchronous sending interface and monitors the sending result in the callback function: if the sending is successful, it will record the success log or ignore it; if the sending fails, the reason for the failure will be recorded in the log, and another attempt will be made according to the retry strategy; if it still fails after multiple retries, the message will be placed in the local temporary buffer queue and delivery will be attempted again after background recovery or manual intervention. 6.根据权利要求5所述的一种基于工业互联网的设备数据存储Doris方法,其特征在于,所述S32具体包括:6. The Doris method for storing device data based on the Industrial Internet according to claim 5, wherein S32 specifically includes: S321:Broker列表:将所有Broker的地址写进Producer的配置中;S321: Broker list: write the addresses of all Brokers into the Producer configuration; S322:消息确认机制:推荐设置为“all”,表示消息要写入所有In-Sync副本后才算发送成功,确保数据不丢失;S322: Message confirmation mechanism: It is recommended to set it to "all", which means that the message is sent successfully only after it is written to all In-Sync replicas to ensure that data is not lost; S323:重试次数:当发送失败时自动重试,如果多次重试后仍然失败,将触发报警或写入本地日志;S323: Retry times: Automatically retry when sending fails. If it still fails after multiple retries, an alarm will be triggered or written to the local log; S324:序列化器:将设备编号作为Key发送时使用字符串序列化,将JSON字符串作为Value时也使用UTF-8编码序列化;S324: Serializer: Use string serialization when sending the device number as a key, and also use UTF-8 encoding serialization when sending a JSON string as a value; S325:分区分配策略:采用默认的哈希分区策略,使用设备编号进行哈希后对分区总数取模。S325: Partition allocation strategy: Use the default hash partition strategy, hash the device number and then modulo the total number of partitions. 7.根据权利要求6所述的一种基于工业互联网的设备数据存储Doris方法,其特征在于,所述S52具体包括:7. The Doris method for storing device data based on the Industrial Internet according to claim 6, wherein S52 specifically includes: S521:Kafka优化:S521: Kafka optimization: 批量发送与压缩:在Producer端设定适当的批量等待时间,将短时间内到达的多条消息聚合后一起发送,并启用压缩算法;Batch sending and compression: Set an appropriate batch waiting time on the Producer side, aggregate multiple messages arriving within a short period of time and send them together, and enable the compression algorithm; 合理配置并发消费者数:需要根据BE节点CPU、内存与硬盘I/O 的指标,找到并发数的最佳平衡点;Reasonably configure the number of concurrent consumers: Find the optimal balance of concurrent consumers based on the CPU, memory, and disk I/O indicators of the BE node; S522:Doris优化:S522: Doris optimization: 列存编码与压缩:针对表中的常用字段,采用列存压缩编码方式,大幅减少存储占用并提高扫描效率;Column encoding and compression: Column compression encoding is used for commonly used fields in the table, significantly reducing storage usage and improving scanning efficiency. 小文件合并:可定时执行分区文件合并操作,让小文件自动合并成大文件;Small file merging: Partition file merging operations can be performed regularly, allowing small files to be automatically merged into large files; 统计信息维护:定期对表执行分析操作,更新列的直方图统计信息,让查询优化器能够选择更优执行计划。Statistics maintenance: Regularly perform analysis operations on tables to update column histogram statistics, allowing the query optimizer to select a better execution plan.
CN202510873588.XA 2025-06-27 2025-06-27 Device data storage Doris method based on industrial Internet Active CN120386780B (en)

Priority Applications (1)

Application Number Priority Date Filing Date Title
CN202510873588.XA CN120386780B (en) 2025-06-27 2025-06-27 Device data storage Doris method based on industrial Internet

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
CN202510873588.XA CN120386780B (en) 2025-06-27 2025-06-27 Device data storage Doris method based on industrial Internet

Publications (2)

Publication Number Publication Date
CN120386780A CN120386780A (en) 2025-07-29
CN120386780B true CN120386780B (en) 2025-09-26

Family

ID=96488016

Family Applications (1)

Application Number Title Priority Date Filing Date
CN202510873588.XA Active CN120386780B (en) 2025-06-27 2025-06-27 Device data storage Doris method based on industrial Internet

Country Status (1)

Country Link
CN (1) CN120386780B (en)

Citations (2)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN114942916A (en) * 2022-03-30 2022-08-26 上海东普信息科技有限公司 Doris-based real-time data bin design method, device, equipment and storage medium
CN119561860A (en) * 2024-12-02 2025-03-04 中企云链股份有限公司 Kafka-based point-of-sale data transmission and storage method and system

Family Cites Families (5)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN111787066B (en) * 2020-06-06 2023-07-28 王科特 Internet of things data platform based on big data and AI
CN118939638A (en) * 2023-05-09 2024-11-12 马上消费金融股份有限公司 Data processing system, method, device and storage medium
CN120029796A (en) * 2023-11-21 2025-05-23 电科云(北京)科技有限公司 A fault plan recommendation method and device based on real-time log stream association matching
CN119271888A (en) * 2024-09-19 2025-01-07 厦门市安杰云网络有限公司 A method, device and equipment for processing integrated CDN business data based on ClickHouse
CN119961085A (en) * 2024-12-26 2025-05-09 南京欣网互联网络科技有限公司 A high reliability lossless data acquisition and processing system and method

Patent Citations (2)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN114942916A (en) * 2022-03-30 2022-08-26 上海东普信息科技有限公司 Doris-based real-time data bin design method, device, equipment and storage medium
CN119561860A (en) * 2024-12-02 2025-03-04 中企云链股份有限公司 Kafka-based point-of-sale data transmission and storage method and system

Also Published As

Publication number Publication date
CN120386780A (en) 2025-07-29

Similar Documents

Publication Publication Date Title
CN114048217B (en) Incremental data synchronization method and device, electronic device and storage medium
CN107038162B (en) Real-time data query method and system based on database log
CN105824744A (en) Real-time log collection and analysis method on basis of B2B (Business to Business) platform
CN112118174B (en) Software defined data gateway
US20110153603A1 (en) Time series storage for large-scale monitoring system
CN109951463A (en) A kind of Internet of Things big data analysis method stored based on stream calculation and novel column
CN107707414A (en) The monitoring system and method for CDN
CN109918349A (en) Log processing method, device, storage medium and electronic device
CN114201540A (en) Industrial multi-source data acquisition and storage system
CN111966289A (en) Partition optimization method and system based on Kafka cluster
Cao et al. Tcprt: Instrument and diagnostic analysis system for service quality of cloud databases at massive scale in real-time
CN115391429A (en) Time sequence data processing method and device based on big data cloud computing
CN120196623A (en) A data cleaning method, device and electronic device based on Drools rule engine
CN119961085A (en) A high reliability lossless data acquisition and processing system and method
KR101736382B1 (en) Ems server and log data management method thereof
CN120386780B (en) Device data storage Doris method based on industrial Internet
CN113872814A (en) Information processing method, device and system for content distribution network
CN112751722B (en) Data transmission quality monitoring method and system
CN116743558A (en) Concurrent traffic monitoring method, device, terminal equipment and storage medium
CN113360319B (en) Data backup method and device
CN116010388A (en) Data verification method, data acquisition server and data verification system
CN111294231B (en) Resource management method and system
Wang et al. Model Construction and Data Management of Running Log in Supporting SaaS Software Performance Analysis.
CN117609315B (en) Data processing method, device, equipment and readable storage medium
Zhang et al. Research and application of streaming Data transmission and processing architecture based on Pulsar

Legal Events

Date Code Title Description
PB01 Publication
PB01 Publication
SE01 Entry into force of request for substantive examination
SE01 Entry into force of request for substantive examination
GR01 Patent grant