[go: up one dir, main page]

CN117950820A - Data production method, data processing method and device - Google Patents

Data production method, data processing method and device Download PDF

Info

Publication number
CN117950820A
CN117950820A CN202311316101.5A CN202311316101A CN117950820A CN 117950820 A CN117950820 A CN 117950820A CN 202311316101 A CN202311316101 A CN 202311316101A CN 117950820 A CN117950820 A CN 117950820A
Authority
CN
China
Prior art keywords
data
task
target
processing
target task
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Pending
Application number
CN202311316101.5A
Other languages
Chinese (zh)
Inventor
邓成东
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
Mashang Consumer Finance Co Ltd
Original Assignee
Mashang Consumer Finance Co Ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Mashang Consumer Finance Co Ltd filed Critical Mashang Consumer Finance Co Ltd
Priority to CN202311316101.5A priority Critical patent/CN117950820A/en
Publication of CN117950820A publication Critical patent/CN117950820A/en
Pending legal-status Critical Current

Links

Classifications

    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/48Program initiating; Program switching, e.g. by interrupt
    • G06F9/4806Task transfer initiation or dispatching
    • G06F9/4843Task transfer initiation or dispatching by program, e.g. task dispatcher, supervisor, operating system
    • G06F9/4881Scheduling strategies for dispatcher, e.g. round robin, multi-level priority queues

Landscapes

  • Engineering & Computer Science (AREA)
  • Software Systems (AREA)
  • Theoretical Computer Science (AREA)
  • Physics & Mathematics (AREA)
  • General Engineering & Computer Science (AREA)
  • General Physics & Mathematics (AREA)
  • Information Retrieval, Db Structures And Fs Structures Therefor (AREA)

Abstract

The present disclosure provides a data production method, a data processing method and a device, wherein the data production method includes: acquiring a task identifier of a target task and a first task running state; acquiring a first processing batch identifier corresponding to a target task according to the task identifier and the first task running state; generating a second processing batch identifier of the target task according to the first processing batch identifier, and acquiring n pieces of business data of the target task in the second processing batch; and generating n pieces of task information of n pieces of service data according to the task identification and the second processing batch identification. According to the embodiment of the disclosure, the target task can be efficiently processed on the premise of meeting the task running state of the flexible control target task.

Description

Data production method, data processing method and device
Technical Field
The present disclosure relates to the field of computer technologies, and in particular, to a data production method, a data processing method and apparatus, an electronic device, and a computer readable storage medium.
Background
In the digital era, an enterprise application program generally generates a large amount of service data in an operation process, in order to improve the data utilization efficiency, the enterprise generally salvages corresponding service data from the large amount of generated service data according to a certain salvage rule, and performs data processing on the salvaged service data.
In the related art, for such an application scenario, in a specific implementation, a data processing task is generally set on a server, and the data processing task is executed on the server in a single-thread manner, and in the task execution process, when the task state of the data processing task is "running", the thread will acquire a plurality of pieces of service data meeting a preset condition from a data source, for example, a database for storing enterprise service data, and perform data processing on the acquired plurality of pieces of service data one by one, for example, perform data cleaning processing, and then input the data after the data cleaning processing into a quality inspection model for screening to query whether the service data contains inconsistent data; after processing the acquired plurality of pieces of service data, or if the task state of the data processing task is artificially set to a "pause" state or an "end" state, the thread stops executing the data processing task.
In the related art, since the task processing method relies on a single thread to process the data processing task, the task processing method can flexibly control the start and stop of the task, but has a problem of low efficiency.
Disclosure of Invention
The disclosure provides a task processing method and device, electronic equipment and a computer readable storage medium.
In a first aspect, the present disclosure provides a data production processing method, applied to a data production end, where the method includes:
acquiring a task identifier of a target task and a first task running state;
acquiring a first processing batch identifier corresponding to the target task according to the task identifier and the first task running state;
Generating a second processing batch identifier of the target task according to the first processing batch identifier, and acquiring n pieces of business data of the target task in the second processing batch, wherein the business data corresponding to the target task comprises the n pieces of business data, and n is a positive integer;
and generating n task messages of the n business data according to the task identifier and the second processing batch identifier.
According to the data production method provided by the embodiment of the disclosure, the data production end does not need to take out all business data of the target task at one time, but can carry out data production in a batch mode, compared with a mode of producing all business data at one time, the method can avoid data blocking caused by stacking a large amount of business data in a cache, and meanwhile, when the task running state of the target task is manually adjusted, the processing mode can also reduce the workload which is consumed when cleaning a large amount of business data cached in the cache, and avoid resource waste; in addition, the data production end and the data consumption end can be decoupled by processing n pieces of service data acquired in one processing batch in a task message mode, so that the running state of a target task can be flexibly adjusted; in addition, the business data in one processing batch is processed in a task message mode, so that the data consumption end can process the task message containing the business data of the target task in a multi-process and/or multi-thread mode according to the requirement without being limited by single threads of the data production end when the business data is processed, and the processing efficiency of the target task is improved.
In a second aspect, the present disclosure provides a data processing method, applied to a data consumption end, the method including:
receiving a data processing request for a target task;
Responding to the data processing request, and acquiring k task messages from a target partition of a target task message queue based on a first thread, wherein k is a positive integer, and the task messages in the target partition of the target task message queue are written by a data production end corresponding to the data consumption end according to the data production method of the first aspect;
Distributing the k task messages to a plurality of second threads for concurrent processing based on the first threads; the second thread is used for processing the business data in the distributed task message.
According to the data processing method provided by the embodiment of the disclosure, since the data consuming end is not required to be coupled with the data producing end, the data consuming end is not required to be limited by the data producing end to produce service data in a single thread, so that after receiving a data processing request for a target task, the data consuming end obtains k task messages from a target partition of a target task message queue by using a first thread, and distributes the k task messages to a plurality of second threads for concurrent processing based on the first thread, and the processing speed of the service data for the target task can be improved, thereby improving the processing efficiency of the target task.
In a third aspect, the present disclosure provides a task processing system comprising: the system comprises a data production end, a data consumption end and a task detection end;
The data production end is configured to perform production processing on service data corresponding to a target task according to the data production method described in the first aspect;
The data consumption end is configured to process the service data that is produced and processed by the data production end according to the data processing method described in the second aspect;
the task detection end is configured to obtain a target production data count and a target consumption data count corresponding to the target task when the task production state of the target task is a first preset state, and update the task running state of the target task to a second preset state when the values of the target production data count and the target consumption data count are the same and are both greater than a preset value.
In a fourth aspect, the present disclosure provides a data production device, applied to a data production end, the device comprising:
The first acquisition unit is used for acquiring a task identifier of a target task and a first task running state;
The second acquisition unit is used for acquiring a first processing batch identifier corresponding to the target task according to the task identifier and the first task running state;
the data acquisition unit is used for generating a second processing batch identifier of the target task according to the first processing batch identifier, and acquiring n pieces of business data of the target task in the second processing batch, wherein the business data corresponding to the target task comprises the n pieces of business data, and n is a positive integer;
and the data production unit is used for generating n pieces of task information of the n pieces of service data according to the task identification and the second processing batch identification.
In a fifth aspect, the present disclosure provides a data processing apparatus for use at a data consumer, the apparatus comprising:
a receiving unit for receiving a data processing request for a target task;
a third obtaining unit, configured to obtain k task messages from a target partition of a target task message queue based on a first thread in response to the data processing request, where k is a positive integer, where a task message in the target partition of the target task message queue is written by a data production end corresponding to the data consumption end according to the data production method of the first aspect;
The data processing unit is used for distributing the k task messages to a plurality of second threads for concurrent processing based on the first threads; the second thread is used for processing the business data in the distributed task message.
In a sixth aspect, the present disclosure provides an electronic device comprising: at least one processor; and a memory communicatively coupled to the at least one processor; wherein the memory stores one or more computer programs executable by the at least one processor, the one or more computer programs being executable by the at least one processor to enable the at least one processor to perform the data production method of the first aspect or the data processing method of the second aspect described above.
In a seventh aspect, the present disclosure provides a computer-readable storage medium having stored thereon a computer program, wherein the computer program, when executed by a processor, implements the data production method of the first aspect or the data processing method of the second aspect described above.
It should be understood that the description in this section is not intended to identify key or critical features of the embodiments of the disclosure, nor is it intended to be used to limit the scope of the disclosure. Other features of the present disclosure will become apparent from the following specification.
Drawings
The accompanying drawings are included to provide a further understanding of the disclosure, and are incorporated in and constitute a part of this specification, illustrate embodiments of the disclosure and together with the description serve to explain the disclosure, without limitation to the disclosure. The above and other features and advantages will become more readily apparent to those skilled in the art by describing in detail exemplary embodiments with reference to the attached drawings, in which:
FIG. 1 is a schematic diagram of an implementation environment of a data production method and a data processing method according to an embodiment of the disclosure;
FIG. 2 is a flow chart of a data production method provided in an embodiment of the present disclosure;
FIG. 3 is a flow chart of another data processing method provided by an embodiment of the present disclosure;
FIG. 4 is a block diagram of a data production device provided by an embodiment of the present disclosure;
FIG. 5 is a block diagram of a data processing apparatus provided by an embodiment of the present disclosure;
fig. 6 is a block diagram of an electronic device according to an embodiment of the present disclosure.
Detailed Description
For a better understanding of the technical solutions of the present disclosure, exemplary embodiments of the present disclosure will be described below with reference to the accompanying drawings, in which various details of the embodiments of the present disclosure are included to facilitate understanding, and they should be considered as merely exemplary. Accordingly, one of ordinary skill in the art will recognize that various changes and modifications of the embodiments described herein can be made without departing from the scope and spirit of the present disclosure. Also, descriptions of well-known functions and constructions are omitted in the following description for clarity and conciseness.
Embodiments of the disclosure and features of embodiments may be combined with each other without conflict.
As used herein, the term "and/or" includes any and all combinations of one or more of the associated listed items.
The terminology used herein is for the purpose of describing particular embodiments only and is not intended to be limiting of the disclosure. As used herein, the singular forms "a," "an," and "the" are intended to include the plural forms as well, unless the context clearly indicates otherwise. It will be further understood that the terms "comprises" and/or "comprising," when used in this specification, specify the presence of stated features, integers, steps, operations, elements, and/or components, but do not preclude the presence or addition of one or more other features, integers, steps, operations, elements, components, and/or groups thereof. The terms "connected" or "connected," and the like, are not limited to physical or mechanical connections, but may include electrical connections, whether direct or indirect.
Unless otherwise defined, all terms (including technical and scientific terms) used herein have the same meaning as commonly understood by one of ordinary skill in the art. It will be further understood that terms, such as those defined in commonly used dictionaries, should be interpreted as having a meaning that is consistent with their meaning in the context of the relevant art and the present disclosure, and will not be interpreted in an idealized or overly formal sense unless expressly so defined herein.
In the related art, in order to deal with the application scenario of data processing for the service data which needs to be salvaged from the service data and meets the salvage rule, the data processing task is generally processed in a single thread mode on one server node, and although the task processing method can flexibly control the start and stop of the task, the salvaged service data cannot be concurrently processed by utilizing a multi-core processor on a single server node or a plurality of server nodes, so that the task processing method often has the problem of low efficiency.
For example, in the financial field, enterprises generally need to continuously update quality inspection models according to quality inspection business rule changes, and therefore, generally need to salvage pieces of voice text data meeting new quality inspection business rules from business data, such as voice text data, for example, after a period of time, and perform data processing on the pieces of voice text data, such as performing data cleaning processing and adding labels to the voice text data to construct new training sets, and train the quality inspection models based on the new training sets. In such an application scenario, when the service data is salvaged, the current data processing task needs to be ended in the task execution process due to unreasonable setting of the salvaging condition, namely, the data screening condition, or the current data processing task needs to be paused due to the need of distributing the computing resource to other tasks with higher priority, so that in order to flexibly control the starting or stopping of the task, the service data meeting the data screening condition is generally salvaged out in a single-thread mode at one time and then is processed one by one, however, the task is processed based on the method, which generally has the problem of low processing efficiency.
In view of this, in the embodiment of the present disclosure, although in order to suspend tasks at any time, it is necessary to perform data salvage operations in a single-node single-thread manner, i.e., to produce or acquire service data, after service data is acquired, it is not necessary to directly continue processing the acquired service data in the single-thread manner in a synchronous processing manner, but the service data may be cached first after service data is acquired, for example, the acquired service data may be cached in a database or a message middleware, so that separate one or more processes and/or threads may be set to perform concurrent processing on the service data cached in the database or the message middleware, and in view of relatively long time for reading and writing data from the message middleware when data is read and written in a large amount based on the database, therefore, the present disclosure may perform data salvage operations for executing data in a manner corresponding to a target task based on a target task message queue, i.e., a data production end for producing service data and a data consumption end for performing data processing on service data may be cached in a data processing manner, for n-piece of data may be controlled in a manner not only to satisfy the target task, i.e., when the target task n-piece of data is required to be completely processed, and n-piece of data can be completely processed in a batch manner, and n-size of data is not required to be completely processed in a data processing state is required for the target task, the data blocking caused by the fact that all service data of the target task are acquired and cached at one time can be avoided, and the resource waste is avoided; meanwhile, the data production end only needs to produce the data into the target task message queue, so that the data consumption end is not required to be coupled with the data production end, and therefore, the limitation of single threads of the data production end is not required, and when the task message containing service data in the target task message queue is processed, the data consumption end can perform concurrent processing in a multi-thread or multi-process mode, and the processing efficiency of the target task can be improved.
Referring to fig. 1, a schematic diagram of an implementation environment of a data production method and a data processing method according to an embodiment of the disclosure is shown. As shown in fig. 1, the implementation environment may include a data production end 101, a data consumption end 102, and a network 103, and may further include a task detection end 104.
The data generating end 101 and the data consuming end 102 may be physical servers, for example, a blade server, a rack server, etc., and the data generating end 101 and the data consuming end 102 may be virtual servers, for example, a server cluster deployed in the cloud, which is not limited herein.
The network 103 may be a wireless network or a wired network, and may be a local area network or a wide area network. The data producer 101 and the data consumer 102 may communicate over a network 103.
In the disclosed embodiments, the data production side 101 may be used to participate in implementing a data production method according to any embodiment of the disclosure. For example, can be used to: acquiring a task identifier of a target task and a first task running state; acquiring a first processing batch identifier corresponding to a target task according to the task identifier and the first task running state; generating a second processing batch identifier of the target task according to the first processing batch identifier, and acquiring n pieces of service data of the target task in the second processing batch, wherein the service data corresponding to the target task comprises n pieces of service data, and n is a positive integer; and generating n pieces of task information of n pieces of service data according to the task identification and the second processing batch identification.
The data consumer 102 may be operative to participate in implementing a data processing method according to any embodiment of the present disclosure. For example, can be used to: receiving a data processing request for a target task; in response to a data processing request, k task messages are acquired from a target partition of a target task message queue based on a first thread, wherein k is a positive integer, and the task messages in the target partition of the target task message queue are written by a data production end 101 corresponding to a data consumption end 102 based on the data production method of the embodiment of the disclosure; distributing k task messages to a plurality of second threads based on the first threads for concurrent processing; the second thread is used for processing the business data in the distributed task message.
In addition, as shown in fig. 1, in the embodiment of the present disclosure, the implementation environment may further include a task detection end 104, where the task detection end 104 may be, for example, a physical server, for example, a blade server, a rack server, or may be a virtual server, for example, a server cluster deployed in the cloud, and is not limited herein. The task detection end 104 may communicate with the data production end 101 or the data consumption end 102 based on the network 103.
The task detection end 104 may be configured to: and under the condition that the task production state of the target task is a first preset state, acquiring a target production data count and a target consumption data count corresponding to the target task, and under the condition that the numerical values of the target production data count and the target consumption data count are the same and are larger than the preset numerical value, updating the task running state of the target task to be a second preset state.
It will be appreciated that the implementation environment shown in fig. 1 is merely illustrative and is in no way intended to limit the disclosure, its application or uses. For example, although fig. 1 shows only one data producing end 101, one data consuming end 102, and one task detecting end 104, it is not meant to limit the respective numbers, and multiple data producing ends 101, multiple data consuming ends 102, and multiple task detecting ends 104 may be included in the implementation environment.
In order to achieve efficient processing of a target task on the premise of meeting the requirement of flexibly controlling the running state of the target task, an embodiment of the present disclosure provides a data production processing method, please refer to fig. 2, which is a flowchart of a data production method provided by an embodiment of the present disclosure. The method may be applied in a data production side, which may be, for example, the data production side 101 shown in fig. 1.
It should be noted that, in actual implementation, a plurality of servers may be preset, an operator may create a target task through a system configuration interface, then the system may send the target task to one of the plurality of servers through a preset allocation mechanism for processing, where the allocated server may be regarded as a data production end, and the preset allocation mechanism may be an allocation mechanism based on a client load balancing algorithm of Feign, or may also be an allocation mechanism based on other allocation mechanisms, which is not limited herein.
For example, 3 servers 1,2 and 3 for performing data production, that is, data salvaging are preset, after an operator creates a target task, the system may select a certain server by measuring the number of data production tasks running in the three servers and/or the computing resource utilization rate in each server based on a client load balancing algorithm of Feign, for example, select the server 3 with the lowest resource utilization rate as the data production end of the target task.
As shown in fig. 2, the data production processing method provided in the embodiment of the present disclosure may include the following steps S201 to S204, which will be described in detail below.
Step S201, acquiring a task identifier of a target task and a first task running state;
task identification, which may be used to uniquely identify a target task.
The target task may be any task for producing service data meeting a preset data screening condition and performing data processing on the service data, where the preset data screening condition may be set according to a service type corresponding to the service data.
For example, in the financial field, in the case that approval is obtained by both parties of a voice call, a target task may be used to screen a corpus satisfying quality inspection business rules from voice data and/or text data generated by customer service interacting with a user, so as to update and train a quality inspection model based on the corpus construction training set, where the quality inspection model may be a model for identifying whether an input voice and/or an input text contains non-compliant content, for example, whether the input voice and/or the input text contains non-compliant content such as abuse, violent prompt, etc.
For another example, in the electric power field, the target task may be used to screen the target electric power data meeting the user-defined screening condition from a large amount of original electric power data collected by the data collection end, and convert the target electric power data to obtain data suitable for being displayed for the user to view, for example, convert the target electric power data to obtain chart data suitable for the user to view.
In the embodiment of the present disclosure, the processing of the service data may be one or more of processing the service data such as data cleaning, data conversion, and data labeling; of course, in the specific implementation, the data processing process may be set as required, for example, the data processing process may be an audit process for business data, which is not limited in particular.
The task running state is used for representing the current processing state of the target task, and the task running state can comprise a to-be-processed state, a pause state and an end state; wherein the pending status may be used to indicate that the target task is in an "in-flight" status; the pause state may be used to indicate that the target task is in a "pause" state and the end state may be used to indicate that the target task is in an "end" state; of course, this is merely illustrative, and the running state of the target task may be set according to the need in actual implementation, and is not particularly limited herein.
It should be noted that, in the embodiment of the present disclosure, in order to facilitate flexible control of a task running state of a target task, that is, to facilitate flexible starting, suspending, or ending of the target task, when the data producer side processes the target task, the task processing method may be executed in a single-thread manner, and the thread may be any idle thread in a thread pool preset by the data producer side and used for executing the task processing method.
Step S202, according to the task identification and the first task running state, a first processing batch identification corresponding to the target task is obtained.
The processing lot identification, which may be used to represent a processing lot of business data of the current acquisition target task, may be in the form of "batch-20230101132535", for example.
That is, in order to solve the problem that the related art may cause data blocking when all the service data of the target task is once acquired to be processed in the buffer, the present disclosure adopts batch acquisition data to perform data production processing, that is, in each processing batch, a part of the service data in all the service data of the target task is acquired, and in the next processing batch, another part of the service data in all the service data of the target task is acquired, so as to solve the above problem.
In actual implementation, in order to facilitate management of task information of the target task, a target task record of the target task may be stored in a "data production task table" of a third database, which may be, for example, a mysql database; the target task record may be used to record task information for the target task, and may include, for example, a task identification field (task_id), a current maximum data record identification field (curr_max_record_id), a task running status field (run_status), and a task production status field (produce _status). For example, a target task record for a target task may be expressed as: (task_id: 5, curr_max_record_id:100001, run_status: "in operation", produce _status: "in production").
Wherein, the task identification field is used for storing the unique identification of the task, for example, the task identification of the target task can be '5';
The current maximum data record identification field is used for storing the data record identification with the maximum value in the service data acquired under the current processing batch;
The task running state field is used for storing the running state of the task, the value of the task running state field can be a numerical value used for representing states such as running, suspending, ending and the like, and the value of the task running state field defaults to a value used for representing running, namely, the task running state of the target task defaults to a state to be processed for representing the target task;
The value of the task production status field may be: a value indicating that it is in a first preset state, which may be used to indicate that the target task is in a "production complete" state, or a second preset state, which may be used to indicate that the target task is in a "production" state.
In some embodiments, the obtaining, according to the task identifier and the first task running state, the first processing lot identifier corresponding to the target task may include: and under the condition that the first task running state indicates that the target task is in a pending state, acquiring the first processing batch identifier according to the task identifier.
Step S203, a second processing batch identifier of the target task is generated according to the first processing batch identifier, n pieces of service data of the target task in the second processing batch are obtained, the service data corresponding to the target task includes n pieces of service data, and n is a positive integer.
Specifically, in the embodiment of the present disclosure, when the data production end processes the target task, all the service data corresponding to the target task is not produced at one time, so as to avoid data blocking and solve the problem that the data consumption end causes waste of computing resources due to continuous processing of the service data produced by the data production end in the case that the target task is manually suspended, and when the data production end processes the target task, the service data corresponding to the target task can be obtained in a batch manner, that is, only part of the service data in all the service data corresponding to the target task is obtained in each batch.
The n pieces of service data may be obtained by the data production end by querying from a database for storing service data according to a data screening condition corresponding to the target task, or may be obtained by other manners, which are not limited herein.
Step S204, generating n pieces of task information of n pieces of service data according to the task identification and the second processing batch identification.
It should be noted that, in some embodiments, after generating n task messages of the n pieces of service data, the n task messages may be written into a target partition of a target task message queue; the target Partition (Partition) may be one or more partitions under the Topic (Topic) corresponding to the target task in the target task message queue, that is, a Partition set for the target task and used for storing the task message corresponding to the target task.
The task message corresponding to the service data may include a task identifier (task_id), a data record identifier (record_id), a processing batch identifier (child_batch_no), and a timestamp (timestamp).
In some embodiments, the target task message queue may be a message queue implemented based on message middleware Kafka, i.e., the target task message queue may be a Kafka message queue. In the embodiment of the disclosure, in the case that the target task message queue is a Kafka message queue, the target partition may be at least one partition under a theme set for the target task in the message queue; of course, in the case that the target task message queue is another type of message queue, the partition may also be set according to the characteristics of the corresponding message queue, which is not limited herein.
Specifically, in the related art, after the service data corresponding to the target task is acquired, the data processing on all the service data is always continued in a synchronous manner, and in this manner, the multi-core computing resources in a single server cannot be utilized, and the service data cannot be simultaneously and concurrently processed by using multiple servers, so that there is a problem that the processing efficiency is generally low.
In order to solve the problem, in the embodiment of the present disclosure, on one hand, the data production end adopts a mode of acquiring the business data corresponding to the target task in batches to perform data production processing; on the other hand, in a processing batch, after n pieces of service data of a target task are acquired, the data production end does not directly transmit the n pieces of service data to the data consumption end for data processing, but generates n pieces of task information corresponding to the n pieces of service data, so that the data production end and the data consumption end are decoupled based on the target task information queue, the task running state of the target task can be flexibly controlled, the data consumption end can process the n pieces of task information in a multi-service node and/or multi-thread mode, for example, the n pieces of task information are written into the target task information queue, and the data consumption end can acquire the written n pieces of task information from the target task information queue in a parallel processing mode, so that the concurrent processing of the service data in the n pieces of task information is realized, and the processing efficiency is improved.
Therefore, according to the data production method provided by the embodiment of the disclosure, the data production end does not need to take all the service data of the target task out at one time, but can carry out data production in a batch mode, compared with a mode of producing all the service data at one time, the method can avoid data blocking caused by stacking a large amount of service data in a cache, and meanwhile, when the task running state of the target task is manually adjusted, the processing mode can also reduce the workload which is consumed when cleaning a large amount of service data cached in the cache, and avoid resource waste; in addition, the data production end and the data consumption end can be decoupled in a task message mode through n pieces of service data acquired in one processing batch, so that the running state of a target task can be flexibly adjusted; in addition, the business data in one processing batch is processed in a task message mode, so that the data consumption end can process the task message containing the business data of the target task in a multi-process and/or multi-thread mode according to the requirement without being limited by single threads of the data production end when the business data is processed, and the processing efficiency of the target task is improved.
In some embodiments, before performing the step S202, that is, performing the step of generating the second processing lot identifier of the target task, and acquiring n pieces of service data of the target task in the second processing lot, the method may further include: determining whether the target task meets preset data production conditions according to the first processing batch identification; and executing the second processing batch identification for generating the target task and acquiring n pieces of business data of the target task in the second processing batch under the condition that the target task meets the preset data production condition.
The preset data production conditions may be: whether the allowance of the data to be processed corresponding to the target task is smaller than or equal to a preset threshold value; the to-be-processed data allowance refers to the number of the target task which is not processed by the data consumption end in the n pieces of business data in the first processing batch. Of course, in actual implementation, the preset data production conditions may be set as needed, and are not particularly limited herein.
Specifically, taking the preset data production condition as an example, the determining, according to the first processing lot identifier, whether the target task meets the preset data production condition may be: inquiring the quantity of the processed data according to the first processing batch identifier, wherein the quantity of the processed data is as follows: the number of the target tasks which are read and processed by the data consumption end in the n pieces of business data in the first processing batch; the first processing batch is a processing batch represented by a first processing batch identifier; obtaining the allowance of data to be processed in the first processing batch according to the quantity of the processed data; and under the condition that the allowance of the data to be processed is smaller than or equal to a preset threshold value, determining that the target task meets the preset data production condition.
The preset threshold may be 500, or may be set as needed, which is not particularly limited herein.
In practical implementation, the third database may further be provided with a data execution result table, where the data execution result table may be used to store processed data records, where each processed data record corresponds to one service data, and the processed data records may include the following fields: result identification (result_id), task identification (task_id), data record identification (record_id), and process batch identification (child_batch_no); each processed data record may be generated by the data consumer and written into the data execution result table, for example, the data consumer may generate and store the corresponding processed data record into the data execution result table after completing data processing on the service data in any task message.
The processed data amount can be obtained by querying from the data execution result table by the data production end according to the current first processing batch identifier, for example, the task identifier of the target task is 5, the first processing batch identifier is "batch-20230101132535", and tbl01 represents the data execution result table, and the processed data allowance can be obtained by querying by select count (x) from tbl01 WHERE TASK _id= 5and child_batch_no = 'batch-20230101132535'; of course, this is merely illustrative, and in actual implementation, the processed data allowance may be obtained by querying in other manners, for example, a processed data count corresponding to the first processing lot identifier of the target task may be set in the redis database, after each processing of a piece of service data by the data consuming end, the processed data count in the redis may be updated according to the processing lot corresponding to the service data, and when the data producing end needs to obtain the processed data amount, the data producing end may query the processed data count to obtain the value.
The remaining amount of the data to be processed can be obtained according to the total amount of the service data acquired in the first processing batch and the amount of the processed data obtained by inquiry. For example, in the case where the total number acquired in the first processing lot is n, the remaining amount of data to be processed may be obtained by acquiring the difference between n and the number of processed data described above.
Additionally, in some embodiments, the method may further comprise: under the condition that the allowance of the data to be processed is larger than a preset threshold value, determining that the target task does not meet the preset data production condition; after waiting for the preset time interval, the step of determining whether the target task meets the preset data production condition according to the first processing batch identification is executed again.
That is, if the remaining amount of the data to be processed is greater than a preset threshold, for example, greater than 500 pieces, it is indicated that the currently produced data can be consumed and processed by the data consumer, if the data is continuously produced, data backlog may be caused, and when the target task is manually paused or finished, the backlog data needs to be cleaned by consuming workload, so that resource waste is caused.
In practical implementation, the processing of waiting for the preset time interval may be to set a single thread sleep preset time for the data production end, for example, sleep for 5 seconds, or may be implemented in other manners, which is not limited herein.
In some embodiments, if after waiting for the preset time interval, it is determined that the target task meets the preset data production condition, before executing the step of generating the second processing lot identifier of the target task and obtaining n pieces of service data of the target task in the second processing lot, the method may further include: acquiring a second task running state of the target task after waiting for a preset time interval; and executing the step of generating a second processing batch identifier of the target task and acquiring n pieces of business data of the target task in the second processing batch under the condition that the second task running state indicates that the target task is in a to-be-processed state.
That is, if the target task is determined to have satisfied the preset data production condition according to the first processing lot identification after waiting for the preset time interval, it is considered that the task running state of the target task may be manually changed, for example, manually adjusted to a suspended or end state during the waiting, and therefore, in this case, it is also possible to acquire the second task running state that is currently latest again, and in the case where the second task running state indicates that the target task is in the pending state, the processing of acquiring n pieces of business data in the next processing lot is performed.
For example, taking the preset threshold value as 500 and the number of service data acquired under the processing batch 1 as 5000 as an example, if the number of processed data obtained by query is 4600, the remaining amount of the to-be-processed data under the processing batch 1 is 400, and since 400 is smaller than the preset threshold value 500, it can be determined that the data consumption end is about to process the service data under the batch, in order to improve the data processing efficiency, the data consumption end is prevented from idling due to no service data processing, and the data production end can determine that the target task meets the condition of continuing to produce the data under the condition, so that the next batch can be generated, namely, the identifier of the processing batch 2 is generated, and n pieces of service data under the processing batch 2 are acquired to continue to produce the data;
if the residual quantity of the processed data obtained by inquiry is 2000, the residual quantity of the data to be processed in the processing batch is 3000, and because the value is larger than the preset threshold value 500, a large number of task messages to be processed by the data consumption end in the current batch are backlogged in the target task message queue, and in order to avoid data backlogging, the target task can be determined to not meet the data production condition at present and the data production processing is suspended;
Under the condition that the target task does not meet the data production condition currently, the data production end can sleep for a preset time, such as 5 seconds, by setting a thread, and acquire a new data allowance to be processed again after 5 seconds, if the new data allowance to be processed is smaller than a preset threshold 500, the target task can be determined to meet the preset data production condition; if the new remaining data to be processed acquired again is still greater than or equal to the preset threshold 500, the data production end can continue to set the thread dormancy preset time;
under the condition that the target task meets the preset data production condition after the data production end sleeps for a preset time, considering that the task running state of the target task may be changed in the sleep time, the data production end can acquire the current latest task running state of the target task from the run_status field of the data production task table of the third database, and if the acquired new task running state indicates that the target task is still in a pending state, the steps of generating the next batch, namely, processing the mark of batch 2, and acquiring n pieces of service data under processing batch 2 to continue the data production processing can be executed.
Therefore, based on the method provided by the embodiment of the disclosure, the data production process can be performed on the premise of meeting the flexible control of the running state of the target task, and the processing efficiency of the target task is improved.
In some embodiments, the acquiring n pieces of business data of the target task in the second processing batch in step S203 includes: inquiring a first data identifier according to the task identifier, wherein the first data identifier is the data identifier with the largest value in the data identifiers of n pieces of business data under a first processing batch, and the first processing batch is the processing batch represented by the first processing batch identifier; according to the preset number m, n pieces of service data with the data identification larger than the first data identification are obtained from the first database and used as n pieces of service data in the second processing batch; the first database is used for storing all business data corresponding to the target task, and the business data in the first database are arranged in an ascending order according to the self data identification; the preset number m is used for limiting the number of service data acquired from the service database in each processing batch to be smaller than or equal to m, and n is smaller than or equal to m.
The first database is a database for storing service data of the target task, and may be a mysql database or may be another database, which is not limited herein.
The preset number m may be 5000, for example, or may be set as needed, and is not particularly limited herein.
That is, the data production end may default to obtain 5000 pieces of data per processing batch when obtaining the service data corresponding to the target task in each processing batch, and obtain the actual number of remaining service data if the service data corresponding to the target task in the first database and meeting the data screening condition is less than 5000 pieces.
In actual implementation, the querying the first data identifier according to the task identifier may be: and inquiring from a target task record corresponding to the target task to obtain the first data identifier according to the task identifier and the first processing batch identifier.
Specifically, after n pieces of service data of a target task under a processing batch are obtained each time, the data production end may store a data record identifier with the largest value in the n pieces of service data in a target task record of a data production task table of the first database, and when n pieces of service data of a next batch need to be obtained, in order to avoid repeated data obtaining, the maximum value of the data identifier of the currently obtained service data, that is, the first data identifier, may be queried from the target task record, and then, according to the first data identifier, new n pieces of service data may be queried from all pieces of service data corresponding to the target task that have been arranged in an ascending order in the first database as n pieces of service data under a second processing batch.
Taking a preset number m as 5000 as an example, if the number of the service data to be processed of the target task in the first database is 8000, under the current processing batch 2, 5000 service data can be obtained and task messages corresponding to the 5000 service data are written into a target task message queue for processing by a data consumer; when the target task meets the data production condition, when the next processing batch, that is, the processing batch 3, is acquired, only 3000 pieces of service data to be processed remain in the database, so that the actual number of the remaining service data, that is, 3000 pieces of service data, is acquired under the processing batch 3, and the task message corresponding to the 3000 pieces of service data is written into the target task message queue.
Additionally, in such an embodiment, the method may further comprise: and under the condition that the number of the n pieces of business data in the second processing batch is smaller than the preset number m, updating the task production state of the target task to be a first preset state.
In the embodiment of the disclosure, the first preset state may be used to indicate that the target task is in a "production complete" state.
That is, if the number of n pieces of business data in the second processing lot is smaller than the preset number m, it may be indicated that all pieces of business data corresponding to the target task have completed the production process, in which case the task production state of the target task may be updated to the first preset state for indicating "production complete". For example, in the above example, since the actual number of the service data of the target task remaining in the first database under the processing lot 3, that is, 3000 pieces of service data have been completely fetched and written into the target task message queue, since all the service data of the target task have been completed in this case, the task production state of the target task may be updated to the first preset state identifying that it is in "completed production", for example, the value of produce _status field in the target task record corresponding to the target task in the data production task table of the second database may be updated to "1" to indicate that it is in "completed production" state, so that the task detection end corresponding to the data production end may determine whether to set the task running state of the target task to "completed" state by detecting the field.
In some embodiments, the method may further comprise: writing n task messages into target partitions of a target task message queue, wherein the number of the target partitions is multiple, and the number of the target partitions is determined according to the number of data processing nodes at a data consumption end; the writing n task messages into the target partition of the target task message queue in step S204 includes: and writing the n task messages into a plurality of target partitions of a target task message queue according to a preset allocation mechanism.
The target task message queue in the embodiments of the present disclosure may be a kafka message queue, in which one topic generally corresponds to a class of task messages, each topic generally includes a plurality of partitions, each partition may be used to store a plurality of task messages, and each partition may be allocated to a different server to implement high scalability, load balancing, and dynamic mediation capabilities. The number of partitions under each topic is typically set when the topic is created, and can be configured by adjusting the configuration item num.
In order to avoid the waste of computing resources, in the embodiment of the present disclosure, when the number of data processing nodes in the data consuming end corresponding to the data producing end is multiple, the number of target partitions corresponding to the target task in the target task message queue may be determined according to the number of data processing nodes, and taking the number of data processing nodes as X and the number of target partitions as P as an example, then P satisfies a condition greater than or equal to X.
For example, if the target task corresponds to topic1 under the kafka message queue, and the number of data processing nodes in the data consumption end for processing the service data of the target task is 8, the number of target partitions under topic1 for storing the task message of the target task may be set to a value greater than 8, for example, the number of target partitions may be set to 8, where one data processing node may correspondingly consume the task message in one target partition; of course, the number of target partitions may be 16, in which case one data processing node may consume task messages in two target partitions.
Therefore, according to the method provided by the embodiment of the disclosure, the number of the target partitions in the target task message queue is determined according to the number of the data processing nodes, so that the resource utilization rate can be improved and the resource waste can be avoided when the data consumption end processes data.
In some embodiments, the method may further comprise: in the process of writing n task messages into the target task message queue, each time one task message is written into the target task message queue, the target production data count in the second database is subjected to self-increment 1, wherein the target production data count corresponds to the target task.
The second database may be, for example, a redis database, the target production data being counted as the total number of business data in the second database that are used to represent the target task that has been produced.
In actual practice, the target production data count may be stored in the redis database in the form of a Key-Value pair (Key-Value), the Key of which may be "task ID: produce ", for example, in the case where the task identifier of the target task is" 5", the key may be" 5:product ". It should be noted that, in actual implementation, the key may also be generated according to a Topic (Topic) corresponding to the target task in the target task message queue and a task identifier thereof, for example, in a case where a Topic corresponding to the target task in the target task message queue is "aiedu-task", the key may be "aiedu-task: 5:product".
That is, in order to facilitate the task detection end corresponding to the data production end to accurately determine whether the task running state of the target task can be set to the "end" state, if the data production end writes a task message into the target task message queue, the target production data count in the second database is subjected to self-increment 1 processing, so that the task detection end can determine whether all service data corresponding to the target task has been produced and processed based on the count and the corresponding target consumption data count, thereby setting the task running state to the "end" state.
Corresponding to the data production method applied to the data production end in the above embodiment, the embodiment of the disclosure further provides a data processing method, please refer to fig. 3, which is a flowchart of a data processing method provided by the embodiment of the disclosure. The method may be applied to a data consumer, which may be the data consumer 102 shown in fig. 1. It should be noted that, in the embodiment of the present disclosure, the data consuming end may include a plurality of data processing nodes, and each data processing node may be a server, that is, the data consuming end in the embodiment of the present disclosure may be one or more servers corresponding to the data producing end.
As shown in fig. 3, the data processing method provided in the embodiment of the present disclosure may include the following steps S301 to S303, which will be described in detail below.
Step S301, a data processing request for a target task is received.
The data processing request is used for requesting the data consumption end to process the data of the task message which is produced by the data production end and written into the target task message queue.
In actual implementation, one or more servers for executing data processing can be started in advance, and operators can select a server for executing data processing on service data corresponding to a target task from the one or more servers according to requirements in the process of creating the target task through a system configuration interface; of course, in actual implementation, after creating a target task, the operator may manually start the server as the data consuming end of the target task, which is not limited herein.
In some embodiments, the data processing request may include a task identifier of the target task and a Topic (Topic) of the target task in the target task message queue and a Partition (Partition) number under the Topic, where the data consumption end includes a plurality of data processing nodes, in order to avoid computing resource waste, the number X of the plurality of data processing nodes is less than or equal to the number P of target partitions of the target task under a corresponding Topic in the target task message queue.
Step S302, in response to a data processing request, k task messages are acquired from a target partition of a target task message queue based on a first thread, wherein k is a positive integer, and the task messages in the target partition of the target task message queue are written by a data production end corresponding to a data consumption end according to the data production method.
The first threads are threads in the data consumption end for reading task messages from a target task message queue, i.e. a target partition under the subject corresponding to the target task in the Kafka message queue, and the number of the first threads may be N, for example, in the case that the data consumption end includes 4 data processing nodes, i.e. 4 servers, 2 first threads may be set for each data processing node.
The target partition is one or more partitions under the subject corresponding to the target task in the target task message queue, and the task messages in the target partition can be n pieces of task messages generated based on n pieces of service data of the target task after the data production end acquires the n pieces of service data in batches.
Step S303, distributing k task messages to a plurality of second threads for concurrent processing based on the first thread; the second thread is used for processing the business data in the distributed task message.
The second thread may be a thread for receiving the task message allocated by the first thread and performing data processing on service data in the task message.
That is, in the embodiment of the present disclosure, a consuming main thread for acquiring a task message from a target task message queue and distributing the task message, and a plurality of second threads for performing data processing on the task message may be set in the data consuming end, so as to perform concurrent processing on service data corresponding to the target task, thereby improving data processing efficiency.
Specifically, after receiving the data processing request for the target task, the data consumption end may pull the task message from the target partition corresponding to the target task in the target task message queue based on the pre-configured message consumption main thread, that is, the first thread, and perform data processing on the service data in the task message, for example, perform data cleaning and data labeling processing on the service data, so as to construct a training set for training a model, for example, a quality inspection model.
It may be understood that, in actual implementation, the number k of task messages pulled from the target task message queue by the first thread each time may be set as required, where k is smaller than the number n of service data acquired by the data production end each time in batch, the first thread may repeatedly execute the above steps S301 to S303 after the data processing on the service data in the k task messages by the plurality of second threads is completed, until all task messages in the target partition of the target task message queue are processed.
In some embodiments, the data consumer may include a plurality of data processing nodes, each having a first thread disposed therein; the number of target partitions corresponds to the number of data processing nodes. In this embodiment, the obtaining k task messages from the target partition based on the first thread in step S302 includes: setting the corresponding relation between a plurality of data processing nodes and a plurality of target partitions in a target task message queue; and controlling first threads in the plurality of data processing nodes to acquire k task messages from corresponding target partitions of the target task message queue according to the corresponding relation.
In this embodiment, the distributing k task messages to the plurality of second threads for data concurrency processing based on the first thread in step S303 includes: and the first thread in each data processing node sends the acquired k task messages to a plurality of second threads in the data processing node for data concurrency processing.
In other words, in actual implementation, the corresponding relationship between the first thread and each partition in each data processing node can be configured adaptively according to the number of partitions of the target partition and the number of data processing nodes in the data consumption end, so as to improve the data processing efficiency.
For example, in the case where the number of target partitions is 8 and the number of data processing nodes is 4, and each data processing node includes only one first thread, it may be configured that 1 first thread Cheng La in each data processing node fetches task messages in 2 partitions, and distributes the fetched task messages to the second threads in its own node for concurrent processing.
In some embodiments, after distributing k task messages to the plurality of second threads for data concurrency processing based on the first thread, the method further comprises: detecting the execution results of a plurality of second threads aiming at k task messages based on the first threads to obtain detection results; and under the condition that the detection result shows that the business data in the k task messages are successfully processed, performing self-increasing k processing on the target consumption data count in the second database, wherein the target consumption data count corresponds to the target task.
Similar to the principle of the target production data count, the target consumption data count is used for representing the total number of data processing completed by the data consumption end corresponding to the data production end to perform data processing on the data produced by the data production end. The target production data count may be stored in the second database in the form of a key-value pair, for example, the second database is a mysql database, and the key corresponding to the target production data count may be generated according to the task identifier of the target task, for example, in the case that the task identifier of the target task is "5", the key may be "5:concume". It should be noted that, in actual implementation, the key may also be generated according to a Topic (Topic) corresponding to the target task in the target task message queue and a task identifier thereof, for example, in a case that a Topic corresponding to the target task in the target task message queue is "aiedu-task", the key may be "aiedu-task:5:Consume".
Specifically, after the second thread in the data consumption end completes the data processing on one piece of service data, a processed data record corresponding to the service data is generated, the processed data record is written into a data execution result table of the first database, the first thread can determine whether the second threads all complete the data processing on the service data in the allocated task message by detecting the update condition of the processed data record in the data execution result table or by sending a request to the second thread, and if the second threads all complete, the first thread can update the corresponding consumption count in the third database, so that the task detection end can determine whether the task running state of the target task can be set to an end state by detecting whether the values of the target production count and the target consumption count in the third database are consistent.
It will be appreciated that the above-mentioned method embodiments of the present disclosure may be combined with each other to form a combined embodiment without departing from the principle logic, and are limited to the description of the present disclosure. It will be appreciated by those skilled in the art that in the above-described methods of the embodiments, the particular order of execution of the steps should be determined by their function and possible inherent logic.
Corresponding to the above embodiments, the embodiments of the present disclosure further provide a task processing system, which may include the data generating end, the data consuming end, and may further include a task detecting end, where the task detecting end may be, for example, the task detecting end 104 shown in fig. 1.
In the task processing system, the data production end may perform production processing on the service data corresponding to the target task based on the data production method in the above embodiment.
The data consuming side can process the service data processed by the data producing side based on the data processing method in the above embodiment.
The task detection end can be used for acquiring the target production data count and the target consumption data count corresponding to the target task under the condition that the task production state of the target task is the first preset state, and updating the task running state of the target task to be the second preset state under the condition that the numerical values of the target production data count and the target consumption data count are the same and are larger than the preset numerical value.
The first preset state may be a state for indicating that the target task is in "production complete".
In practice, a task detection end in a task processing system may start a timed task thread, which is executed once every preset time, for example, 10 minutes.
The timing task thread can acquire a target production data count R1 and a target consumption data count R2 corresponding to a target task from a second database under the condition that the task running state of the target task in a target task record of a data production task table of a third database is a to-be-processed state, namely 'running', and the task production state is a preset state of the first page, namely 'production completion', and if the R1 and the R2 are the same and the numerical value is greater than a preset numerical value 0, the condition that all business data of the target task are processed and completed by a data consumption end can be indicated, and the value of the task running state in the corresponding target task record can be updated to be an 'end' state; if R1 and R2 are different, it indicates that the service data of the target task is not completely processed, and the timed task thread can acquire the latest R1 and R2 again for detection and judgment after a preset time interval.
It should be noted that, when the timed task thread in the task detection end obtains R1 and R2 from the second database, in order to keep the data accurate, it is necessary to obtain the distributed lock of the redis database first, and release the distributed lock after performing one detection and judgment, and the detailed processing procedure is not repeated here.
In addition, the disclosure further provides a task processing device, an electronic device, and a computer readable storage medium, where the foregoing may be used to implement the data production method or the data processing method provided by the disclosure, and corresponding technical schemes and descriptions and corresponding descriptions referring to method parts are not repeated.
Fig. 4 is a block diagram of a data production device according to an embodiment of the present disclosure.
Referring to fig. 4, an embodiment of the present disclosure provides a data production apparatus that may be applied to a data production end, the data production apparatus 400 including: a first acquisition unit 401, a second acquisition unit 402, a data acquisition unit 403, and a data production unit 404.
The first obtaining unit 401 is configured to obtain a task identifier of a target task and a first task running state.
The second obtaining unit 402 is configured to obtain, according to the task identifier and the first task running state, a first processing lot identifier corresponding to the target task.
The data obtaining unit 403 is configured to generate a second processing batch identifier of the target task according to the first processing batch identifier, and obtain n pieces of service data of the target task in the second processing batch, where the service data corresponding to the target task includes n pieces of service data, and n is a positive integer.
The data production unit 404 is configured to generate n task messages of n pieces of service data according to the task identifier and the second processing batch identifier.
In some embodiments, the apparatus 400 further comprises a determining unit for: before the step of executing the second processing batch identification for generating the target task and acquiring n pieces of business data of the target task in the second processing batch, determining whether the target task meets preset data production conditions according to the first processing batch identification; and executing the second processing batch identification for generating the target task and acquiring n pieces of business data of the target task in the second processing batch under the condition that the target task meets the preset data production condition.
In some embodiments, the determining unit, when determining whether the target task meets the preset data production condition according to the first processing lot identification, may be configured to: inquiring the quantity of the processed data according to the first processing batch identifier, wherein the quantity of the processed data is as follows: the number of the target tasks which are read and processed by the data consumption end in the n pieces of business data in the first processing batch; the first processing batch is a processing batch represented by a first processing batch identifier; obtaining the allowance of data to be processed in the first processing batch according to the quantity of the processed data; and under the condition that the allowance of the data to be processed is smaller than or equal to a preset threshold value, determining that the target task meets the preset data production condition.
In some embodiments, the determining unit may further be configured to: under the condition that the allowance of the data to be processed is larger than a preset threshold value, determining that the target task does not meet the preset data production condition; after waiting for the preset time interval, the step of determining whether the target task meets the preset data production condition according to the first processing batch identification is executed again.
In some embodiments, the determining unit may further be configured to: if the target task meets the preset data production condition after waiting for the preset time interval, before the step of executing the second processing batch identification for generating the target task and acquiring n pieces of service data of the target task in the second processing batch, acquiring a second task running state of the target task after waiting for the preset time interval; and executing the step of generating a second processing batch identifier of the target task and acquiring n pieces of business data of the target task in the second processing batch under the condition that the second task running state indicates that the target task is in a to-be-processed state.
In some embodiments, when acquiring n pieces of business data of the target task in the second processing batch, the data acquiring unit 403 may be configured to: inquiring a first data identifier according to the task identifier, wherein the first data identifier is the data identifier with the largest value in the data identifiers of n pieces of business data under a first processing batch, and the first processing batch is the processing batch represented by the first processing batch identifier; according to the preset number m, n pieces of service data with the data identification larger than the first data identification are obtained from the first database and used as n pieces of service data in the second processing batch; the first database is used for storing all business data corresponding to the target task, and the business data in the first database are arranged in an ascending order according to the self data identification; the preset number m is used for limiting the number of service data acquired from the service database in each processing batch to be less than or equal to m, and n is less than or equal to m
In some embodiments, the data acquisition unit 403 may also be configured to: and under the condition that the number of the n pieces of business data in the second processing batch is smaller than the preset number m, updating the task production state of the target task to be a first preset state.
In some embodiments, the apparatus 400 may further include a writing unit that may be configured to: writing n task messages into target partitions of a target task message queue, wherein the number of the target partitions is multiple, and the number of the target partitions is determined according to the number of data processing nodes at a data consumption end; the writing unit, when writing n task messages into a target partition of a target task message queue, may be configured to: and writing the n task messages into a plurality of target partitions of a target task message queue according to a preset allocation mechanism.
In some embodiments, the apparatus 400 further comprises a first counting unit, which may be used to: in the process of writing n task messages into the target task message queue, each time one task message is written into the target task message queue, the target production data count in the second database is subjected to self-increment 1, wherein the target production data count corresponds to the target task.
Therefore, based on the data production device provided by the embodiment of the disclosure, the data production end does not need to take out all the service data of the target task at one time, but can carry out data production in a batch mode, compared with a mode of producing all the service data at one time, the processing mode can avoid data blocking caused by stacking a large amount of service data in a cache, and simultaneously when the task running state of the target task is manually adjusted, the processing mode can also reduce the workload which is consumed when cleaning a large amount of service data cached in the cache, and avoid resource waste; in addition, n pieces of service data acquired in one processing batch are written into a target partition corresponding to a target task in a target task message queue in a task message mode, a data production end and a data consumption end can be decoupled, so that the running state of the target task can be flexibly adjusted without being controlled by the data consumption end.
Fig. 5 is a block diagram of a data processing apparatus according to an embodiment of the present disclosure.
Referring to fig. 5, an embodiment of the present disclosure provides a data processing apparatus that may be applied to a data consuming side, the data processing apparatus 500 including: a receiving unit 501, a third acquiring unit 502, and a data processing unit 503.
A receiving unit 501, configured to receive a data processing request for a target task.
The third obtaining unit 502 is configured to obtain k task messages from the target partition of the target task message queue based on the first thread in response to the data processing request, where k is a positive integer, and the task messages in the target partition of the target task message queue are written by the data producer corresponding to the data consumer according to the data producing method in the embodiment of the disclosure.
A data processing unit 503, configured to distribute k task messages to a plurality of second threads for concurrent processing based on the first thread; the second thread is used for processing the business data in the distributed task message.
In some embodiments, the data consumer comprises a plurality of data processing nodes, each of which has a first thread disposed therein; the number of the target partitions corresponds to the number of the data processing nodes; the third obtaining unit 502 may be configured to, when obtaining k task messages from the target partition based on the first thread: setting the corresponding relation between a plurality of data processing nodes and a plurality of target partitions in a target task message queue; and controlling first threads in the plurality of data processing nodes to acquire k task messages from corresponding target partitions of the target task message queue according to the corresponding relation.
In some embodiments, the apparatus 500 further comprises a counting unit, which may be used to: after k task messages are distributed to a plurality of second threads for concurrent data processing based on the first thread, detecting the execution results of the plurality of second threads for the k task messages based on the first thread to obtain detection results; and under the condition that the detection result shows that the business data in the k task messages are successfully processed, performing self-increasing k processing on the target consumption data count in the second database, wherein the target consumption data count corresponds to the target task.
According to the data processing device provided by the embodiment of the disclosure, since the data consuming end is not required to be coupled with the data producing end, the limitation of the data processing in a single-thread mode on the premise that the data producing end needs to control the running state of the target task flexibly is met, so that after the data processing request of the target task is received based on the receiving unit, the data consuming end acquires k task messages from the target partition of the target task message queue by using the first thread in the third acquiring unit, and distributes the k task messages to a plurality of second threads for concurrent processing based on the first thread in the data processing unit, the processing speed of business data for the target task can be improved, and the processing efficiency of the target task is improved
Fig. 6 is a block diagram of an electronic device according to an embodiment of the present disclosure.
Referring to fig. 6, an embodiment of the present disclosure provides an electronic device 600 including: at least one processor 601; at least one memory 602, and one or more I/O interfaces 603, connected between the processor 601 and the memory 602; the memory 602 stores one or more computer programs executable by the at least one processor 601, and the one or more computer programs are executed by the at least one processor 601 to enable the at least one processor 601 to perform the data production method or the data processing method described above.
The disclosed embodiments also provide a computer-readable storage medium having a computer program stored thereon, wherein the computer program, when executed by a processor, implements the data production method or the data processing method described above. The computer readable storage medium may be a volatile or nonvolatile computer readable storage medium.
Embodiments of the present disclosure also provide a computer program product comprising computer readable code, or a non-transitory computer readable storage medium carrying computer readable code, which when executed in a processor of an electronic device, performs the above-described data production method or data processing method.
Those of ordinary skill in the art will appreciate that all or some of the steps, systems, functional modules/units in the apparatus, and methods disclosed above may be implemented as software, firmware, hardware, and suitable combinations thereof. In a hardware implementation, the division between the functional modules/units mentioned in the above description does not necessarily correspond to the division of physical components; for example, one physical component may have multiple functions, or one function or step may be performed cooperatively by several physical components. Some or all of the physical components may be implemented as software executed by a processor, such as a central processing unit, digital signal processor, or microprocessor, or as hardware, or as an integrated circuit, such as an application specific integrated circuit. Such software may be distributed on computer-readable storage media, which may include computer storage media (or non-transitory media) and communication media (or transitory media).
The term computer storage media includes both volatile and nonvolatile, removable and non-removable media implemented in any method or technology for storage of information such as computer readable program instructions, data structures, program modules or other data, as known to those skilled in the art. Computer storage media includes, but is not limited to, random Access Memory (RAM), read Only Memory (ROM), erasable Programmable Read Only Memory (EPROM), static Random Access Memory (SRAM), flash memory or other memory technology, portable compact disc read only memory (CD-ROM), digital Versatile Discs (DVD) or other optical disc storage, magnetic cassettes, magnetic tape, magnetic disk storage or other magnetic storage devices, or any other medium which can be used to store the desired information and which can be accessed by a computer. Furthermore, as is well known to those of ordinary skill in the art, communication media typically embodies computer readable program instructions, data structures, program modules or other data in a modulated data signal such as a carrier wave or other transport mechanism and may include any information delivery media.
The computer readable program instructions described herein may be downloaded from a computer readable storage medium to a respective computing/processing device or to an external computer or external storage device over a network, such as the internet, a local area network, a wide area network, and/or a wireless network. The network may include copper transmission cables, fiber optic transmissions, wireless transmissions, routers, firewalls, switches, gateway computers and/or edge servers. The network interface card or network interface in each computing/processing device receives computer readable program instructions from the network and forwards the computer readable program instructions for storage in a computer readable storage medium in the respective computing/processing device.
The computer program instructions for performing the operations of the present disclosure may be assembly instructions, instruction Set Architecture (ISA) instructions, machine-related instructions, microcode, firmware instructions, state setting data, or source or object code written in any combination of one or more programming languages, including an object oriented programming language such as SMALLTALK, C ++ or the like and conventional procedural programming languages, such as the "C" programming language or similar programming languages. The computer readable program instructions may be executed entirely on the user's computer, partly on the user's computer, as a stand-alone software package, partly on the user's computer and partly on a remote computer or entirely on the remote computer or server. In the case of a remote computer, the remote computer may be connected to the user's computer through any kind of network, including a Local Area Network (LAN) or a Wide Area Network (WAN), or may be connected to an external computer (for example, through the Internet using an Internet service provider). In some embodiments, aspects of the present disclosure are implemented by personalizing electronic circuitry, such as programmable logic circuitry, field Programmable Gate Arrays (FPGAs), or Programmable Logic Arrays (PLAs), with state information of computer readable program instructions, which can execute the computer readable program instructions.
The computer program product described herein may be embodied in hardware, software, or a combination thereof. In an alternative embodiment, the computer program product is embodied as a computer storage medium, and in another alternative embodiment, the computer program product is embodied as a software product, such as a software development kit (Software Development Kit, SDK), or the like.
Various aspects of the present disclosure are described herein with reference to flowchart illustrations and/or block diagrams of methods, apparatus (systems) and computer program products according to embodiments of the disclosure. It will be understood that each block of the flowchart illustrations and/or block diagrams, and combinations of blocks in the flowchart illustrations and/or block diagrams, can be implemented by computer-readable program instructions.
These computer readable program instructions may be provided to a processor of a general purpose computer, special purpose computer, or other programmable data processing apparatus to produce a machine, such that the instructions, which execute via the processor of the computer or other programmable data processing apparatus, create means for implementing the functions/acts specified in the flowchart and/or block diagram block or blocks. These computer readable program instructions may also be stored in a computer readable storage medium that can direct a computer, programmable data processing apparatus, and/or other devices to function in a particular manner, such that the computer readable medium having the instructions stored therein includes an article of manufacture including instructions which implement the function/act specified in the flowchart and/or block diagram block or blocks.
The computer readable program instructions may also be loaded onto a computer, other programmable data processing apparatus, or other devices to cause a series of operational steps to be performed on the computer, other programmable apparatus or other devices to produce a computer implemented process such that the instructions which execute on the computer, other programmable apparatus or other devices implement the functions/acts specified in the flowchart and/or block diagram block or blocks.
The flowcharts and block diagrams in the figures illustrate the architecture, functionality, and operation of possible implementations of systems, methods and computer program products according to various embodiments of the present disclosure. In this regard, each block in the flowchart or block diagrams may represent a module, segment, or portion of instructions, which comprises one or more executable instructions for implementing the specified logical function(s). In some alternative implementations, the functions noted in the block may occur out of the order noted in the figures. For example, two blocks shown in succession may, in fact, be executed substantially concurrently, or the blocks may sometimes be executed in the reverse order, depending upon the functionality involved. It will also be noted that each block of the block diagrams and/or flowchart illustration, and combinations of blocks in the block diagrams and/or flowchart illustration, can be implemented by special purpose hardware-based systems which perform the specified functions or acts, or combinations of special purpose hardware and computer instructions.
Example embodiments have been disclosed herein, and although specific terms are employed, they are used and should be interpreted in a generic and descriptive sense only and not for purpose of limitation. In some instances, it will be apparent to one skilled in the art that features, characteristics, and/or elements described in connection with a particular embodiment may be used alone or in combination with other embodiments unless explicitly stated otherwise. It will be understood by those skilled in the art that various changes in form and details may be made therein without departing from the scope of the disclosure as set forth in the appended claims.

Claims (17)

1. A data production method, applied to a data production end, the method comprising:
acquiring a task identifier of a target task and a first task running state;
acquiring a first processing batch identifier corresponding to the target task according to the task identifier and the first task running state;
Generating a second processing batch identifier of the target task according to the first processing batch identifier, and acquiring n pieces of business data of the target task in the second processing batch, wherein the business data corresponding to the target task comprises the n pieces of business data, and n is a positive integer;
and generating n task messages of the n business data according to the task identifier and the second processing batch identifier.
2. The method of claim 1, wherein prior to the step of performing the second processing lot identification that generates the target task and obtaining n pieces of business data for the target task under the second processing lot, the method further comprises:
Determining whether the target task meets preset data production conditions according to the first processing batch identification;
And executing the second processing batch identification for generating the target task and acquiring n pieces of business data of the target task in the second processing batch under the condition that the target task meets the preset data production condition.
3. The method of claim 2, wherein determining whether the target task meets a preset data production condition based on the first processing lot identification comprises:
Inquiring the number of processed data according to the first processing batch identifier, wherein the number of processed data is as follows: the number of the target tasks which are read and processed by the data consumption end in the n pieces of business data in the first processing batch; the first processing batch is the processing batch represented by the first processing batch identifier;
Obtaining the allowance of the data to be processed in the first processing batch according to the processed data quantity;
And under the condition that the allowance of the data to be processed is smaller than or equal to a preset threshold value, determining that the target task meets the preset data production condition.
4. A method according to claim 3, characterized in that the method further comprises:
under the condition that the data allowance to be processed is larger than the preset threshold, determining that the target task does not meet the preset data production condition;
and after waiting for a preset time interval, executing the step of determining whether the target task meets preset data production conditions according to the first processing batch identifier again.
5. The method of claim 4, wherein if after waiting for the preset time interval, determining that the target task meets the preset data production condition, before performing the step of generating a second processing lot identification of the target task, and acquiring n pieces of business data of the target task under the second processing lot, the method further comprises:
acquiring a second task running state of the target task after waiting for the preset time interval;
Executing the second processing batch identification for generating the target task and acquiring n pieces of business data of the target task under a second processing batch under the condition that the second task running state indicates that the target task is in the to-be-processed state.
6. The method of claim 1, wherein the obtaining n pieces of business data for the target task in the second processing batch comprises:
Inquiring a first data identifier according to the task identifier, wherein the first data identifier is the data identifier with the largest value in the data identifiers of n pieces of business data under a first processing batch, and the first processing batch is the processing batch represented by the first processing batch identifier;
according to the preset number m, n pieces of service data with the data identification larger than the first data identification are obtained from a first database and used as n pieces of service data in the second processing batch;
The first database is used for storing all business data corresponding to the target task, and the business data in the first database are arranged in ascending order according to the self data identification; the preset number m is used for limiting the number of service data acquired from the service database by each processing batch to be smaller than or equal to m, and n is smaller than or equal to m.
7. The method of claim 6, wherein the method further comprises:
And under the condition that the number of the n pieces of business data in the second processing batch is smaller than the preset number m, updating the task production state of the target task to be a first preset state.
8. The method according to claim 1, wherein the method further comprises: writing the n task messages into target partitions of a target task message queue, wherein the number of the target partitions is multiple, and the number of the target partitions is determined according to the number of data processing nodes in the data consumption terminal;
The writing the n task messages into the target partition of the target task message queue includes:
And writing the n task messages into a plurality of target partitions of a target task message queue according to a preset allocation mechanism.
9. The method of claim 8, wherein the method further comprises:
and in the process of writing the n task messages into the target task message queue, performing self-increment 1 on target production data count in a second database every time one task message is written into the target task message queue, wherein the target production data count corresponds to the target task.
10. A data processing method, applied to a data consumer, the method comprising:
receiving a data processing request for a target task;
In response to the data processing request, obtaining k task messages from a target partition of a target task message queue based on a first thread, wherein k is a positive integer, and the task messages in the target partition of the target task message queue are written by a data production end corresponding to the data consumption end according to the data production method of any one of claims 1-9;
Distributing the k task messages to a plurality of second threads for concurrent processing based on the first threads; the second thread is used for processing the business data in the distributed task message.
11. The method of claim 10, wherein the data consuming side comprises a plurality of data processing nodes, each data processing node having the first thread disposed therein; the number of the target partitions corresponds to the number of the data processing nodes;
the obtaining k task messages from the target partition based on the first thread comprises the following steps:
Setting the corresponding relation between the plurality of data processing nodes and a plurality of target partitions in the target task message queue;
and controlling the first threads in the plurality of data processing nodes to acquire k task messages from the corresponding target partitions of the target task message queue according to the corresponding relation.
12. The method of claim 10, wherein after distributing the k task messages to a plurality of second threads for concurrent processing of data based on the first thread, the method further comprises:
Detecting the execution results of the plurality of second threads aiming at the k task messages based on the first threads to obtain detection results;
And under the condition that the detection result shows that the business data in the k task messages are successfully processed, performing self-increasing k processing on a target consumption data count in a second database, wherein the target consumption data count corresponds to the target task.
13. A data production device for use at a data production site, the device comprising:
The first acquisition unit is used for acquiring a task identifier of a target task and a first task running state;
The second acquisition unit is used for acquiring a first processing batch identifier corresponding to the target task according to the task identifier and the first task running state;
the data acquisition unit is used for generating a second processing batch identifier of the target task according to the first processing batch identifier, and acquiring n pieces of business data of the target task in the second processing batch, wherein the business data corresponding to the target task comprises the n pieces of business data, and n is a positive integer;
and the data production unit is used for generating n pieces of task information of the n pieces of service data according to the task identification and the second processing batch identification.
14. A data processing apparatus for use at a data consumer, the apparatus comprising:
a receiving unit for receiving a data processing request for a target task;
A third obtaining unit, configured to obtain k task messages from a target partition of a target task message queue based on a first thread in response to the data processing request, where k is a positive integer, and the task messages in the target partition of the target task message queue are written by a data production end corresponding to the data consumption end according to the data production method according to any one of claims 1 to 9;
The data processing unit is used for distributing the k task messages to a plurality of second threads for concurrent processing based on the first threads; the second thread is used for processing the business data in the distributed task message.
15. A task processing system, comprising:
a data production end, configured to perform production processing on service data corresponding to a target task according to the data production method of any one of claims 1 to 9;
a data consuming end for processing the business data produced and processed by the data producing end according to the data processing method of any one of claims 10-12;
The task detection end is used for acquiring a target production data count and a target consumption data count corresponding to the target task when the task production state of the target task is a first preset state, and updating the task running state of the target task to be a second preset state when the values of the target production data count and the target consumption data count are the same and are larger than the preset values.
16. An electronic device, comprising:
at least one processor; and
A memory communicatively coupled to the at least one processor; wherein,
The memory stores one or more computer programs executable by the at least one processor to enable the at least one processor to perform the data production method of any one of claims 1-9 or the data processing method of any one of claims 10-12.
17. A computer-readable storage medium, on which a computer program is stored, characterized in that the computer program, when being executed by a processor, implements the data production method according to any one of claims 1-9 or the data processing method according to any one of claims 10-12.
CN202311316101.5A 2023-10-11 2023-10-11 Data production method, data processing method and device Pending CN117950820A (en)

Priority Applications (1)

Application Number Priority Date Filing Date Title
CN202311316101.5A CN117950820A (en) 2023-10-11 2023-10-11 Data production method, data processing method and device

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
CN202311316101.5A CN117950820A (en) 2023-10-11 2023-10-11 Data production method, data processing method and device

Publications (1)

Publication Number Publication Date
CN117950820A true CN117950820A (en) 2024-04-30

Family

ID=90799349

Family Applications (1)

Application Number Title Priority Date Filing Date
CN202311316101.5A Pending CN117950820A (en) 2023-10-11 2023-10-11 Data production method, data processing method and device

Country Status (1)

Country Link
CN (1) CN117950820A (en)

Cited By (1)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN119105856A (en) * 2024-11-06 2024-12-10 乐麦信息技术(杭州)有限公司 E-commerce platform data quality inspection method and system

Cited By (2)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN119105856A (en) * 2024-11-06 2024-12-10 乐麦信息技术(杭州)有限公司 E-commerce platform data quality inspection method and system
CN119105856B (en) * 2024-11-06 2025-05-20 乐麦信息技术(杭州)有限公司 E-commerce platform data quality inspection method and system

Similar Documents

Publication Publication Date Title
US20230004434A1 (en) Automated reconfiguration of real time data stream processing
CN108062246B (en) Resource regulating method and device for deep learning frame
CN107729139B (en) Method and device for concurrently acquiring resources
CN110769278B (en) Distributed video transcoding method and system
US10515326B2 (en) Database systems and related queue management methods
US8402466B2 (en) Practical contention-free distributed weighted fair-share scheduler
US10338958B1 (en) Stream adapter for batch-oriented processing frameworks
CN107451147B (en) Method and device for dynamically switching kafka clusters
CN112579148A (en) Service message processing method and device based on service agent and electronic equipment
US11915035B1 (en) Task state updating method and apparatus, device, and medium
CN110750592A (en) Data synchronization method, device and terminal equipment
US20220138074A1 (en) Method, electronic device and computer program product for processing data
EP4123449A1 (en) Resource scheduling method and related device
US8918795B2 (en) Handling and reporting of object state transitions on a multiprocess architecture
CN110968430B (en) Processing method based on message queue and message queue
CN113626217B (en) Asynchronous message processing method, device, electronic equipment and storage medium
CN114900449B (en) Resource information management method, system and device
US20090254652A1 (en) Resource correlation prediction
CN114610504A (en) Message processing method and device, electronic equipment and storage medium
CN113204425A (en) Method and device for process management internal thread, electronic equipment and storage medium
US20120102168A1 (en) Communication And Coordination Between Web Services In A Cloud-Based Computing Environment
CN113742057A (en) Task execution method and device
CN117076096A (en) Task flow execution method and device, computer readable medium and electronic equipment
CN117950820A (en) Data production method, data processing method and device
CN110399393B (en) Data processing method, device, medium and electronic equipment

Legal Events

Date Code Title Description
PB01 Publication
PB01 Publication
SE01 Entry into force of request for substantive examination
SE01 Entry into force of request for substantive examination