diff --git a/packages/module-workflow/src/client/nodes/create.tsx b/packages/module-workflow/src/client/nodes/create.tsx index 1a4521e221..f32f9f6d92 100644 --- a/packages/module-workflow/src/client/nodes/create.tsx +++ b/packages/module-workflow/src/client/nodes/create.tsx @@ -46,6 +46,16 @@ export default class extends Instruction { // disabled: true // } // }, + useWorker: { + type: 'boolean', + title: '强制优先使用工作线程', + name: 'useWorker', + 'x-decorator': 'FormItem', + 'x-component': 'Checkbox', + // 'x-component-props': { + // disabled: true + // } + }, params: { type: 'object', properties: { diff --git a/packages/module-workflow/src/server/Plugin.ts b/packages/module-workflow/src/server/Plugin.ts index 7cc1d07261..3ad249417d 100644 --- a/packages/module-workflow/src/server/Plugin.ts +++ b/packages/module-workflow/src/server/Plugin.ts @@ -2,9 +2,12 @@ import path from 'path'; import { Op, Transactionable } from '@tachybase/database'; import { Logger, LoggerOptions } from '@tachybase/logger'; import Application, { Plugin, PluginOptions } from '@tachybase/server'; -import { Registry } from '@tachybase/utils'; +import { Registry, uid } from '@tachybase/utils'; +import axios, { AxiosRequestConfig } from 'axios'; +import FormData from 'form-data'; import LRUCache from 'lru-cache'; +import mime from 'mime-types'; import initActions from './actions'; import { EXECUTION_STATUS, JOB_STATUS } from './constants'; @@ -576,4 +579,163 @@ export default class PluginWorkflowServer extends Plugin { return db.sequelize.transaction(); } } + + isJSON(str) { + try { + return JSON.parse(str); + } catch (e) { + return false; + } + } + + //目前可处理url,json对象,base64 + async handleResource(resource: string, origin: string, token: string) { + const parseRes = this.isJSON(resource); + const config: AxiosRequestConfig = { + method: 'get', + url: resource, + responseType: 'stream', + }; + const form = new FormData(); + if (resource.startsWith('data:')) { + const matches = resource.match(/^data:(.+);base64,(.+)$/); + if (matches) { + const contentType = matches[1]; + const base64Data = matches[2]; + const buffer = Buffer.from(base64Data, 'base64'); + const ext = mime.extension(contentType); + const filename = `${uid()}.${ext}`; + + form.append('file', buffer, { + filename, + contentType, + }); + } else { + throw new Error('Invalid data URL format'); + } + } else if (parseRes) { + const { url: resourceUrl, params: resourceParams, headers: resourceHeaders, body: resourceBody } = parseRes; + config.url = resourceUrl; + config.params = resourceParams; + config.headers = resourceHeaders; + if (resourceHeaders['content-type'] === 'multipart/form-data') { + const formData = new FormData(); + Object.entries(resourceBody).forEach(([key, value]) => { + formData.append(key, value); + }); + config.data = formData; + } else { + config.data = resourceBody; + } + const response = await axios(config); + const contentType = response.headers['content-type']; + // 根据 MIME 类型获取文件扩展名 + const ext = mime.extension(contentType); + const filename = `${uid()}.${ext}`; + // 创建 FormData 实例 + form.append('file', response.data, { + filename, + contentType: response.headers['content-type'], + }); + } else { + // 下载指定 URL 的内容 + const response = await axios(config); + // 获取文件的 MIME 类型 + const contentType = response.headers['content-type']; + // 根据 MIME 类型获取文件扩展名 + const ext = mime.extension(contentType); + const filename = `${uid()}.${ext}`; + // 创建 FormData 实例 + form.append('file', response.data, { + filename, + contentType: response.headers['content-type'], + }); + } + const uploadResponse = await axios({ + method: 'post', + url: origin + '/api/attachments:create', + data: form, + headers: { + ...form.getHeaders(), + Authorization: 'Bearer ' + token, + }, + }); + return uploadResponse.data.data; + } + + // 处理工作流中的附件字段 + private async handleAttachmentFields(params: { + dataSourceName: string; + collectionName: string; + values: any; + origin: string; + token: string; + }) { + const { dataSourceName, collectionName, values, origin, token } = params; + + const collection = this.app.dataSourceManager.dataSources + .get(dataSourceName) + .collectionManager.getCollection(collectionName); + + const fields = collection.getFields(); + const fieldNames = Object.keys(values); + const includesFields = fields.filter((field) => fieldNames.includes(field.options.name)); + + // 处理文件类型 + for (const attachmentField of includesFields) { + if (attachmentField.options.interface === 'attachment') { + const urls = values[attachmentField.options.name]; + if (Array.isArray(urls)) { + for (const i in urls) { + urls[i] = await this.handleResource(urls[i], origin, token); + } + } else { + const url = values[attachmentField.options.name]; + values[attachmentField.options.name] = await this.handleResource(url, origin, token); + } + } + } + + return collection; + } + + // 工作线程插入数据 + async workerWorkflowCreate(params) { + const { options, context, transaction } = params; + + const collection = await this.handleAttachmentFields({ + dataSourceName: params.dataSourceName, + collectionName: params.collectionName, + values: options.values, + origin: params.origin, + token: params.token, + }); + + const result = await collection.repository.create({ + ...options, + context, + transaction, + }); + return result.toJSON(); + } + + // 工作线程刷新数据 + async workerWorkflowUpdate(params) { + const { options, context, transaction } = params; + + const collection = await this.handleAttachmentFields({ + dataSourceName: params.dataSourceName, + collectionName: params.collectionName, + values: options.values, + origin: params.origin, + token: params.token, + }); + + const result = await collection.repository.update({ + ...options, + context, + transaction, + }); + return result.length ?? result; + } } diff --git a/packages/module-workflow/src/server/instructions/CreateInstruction.ts b/packages/module-workflow/src/server/instructions/CreateInstruction.ts index 421c51dc0b..8949237ccc 100644 --- a/packages/module-workflow/src/server/instructions/CreateInstruction.ts +++ b/packages/module-workflow/src/server/instructions/CreateInstruction.ts @@ -8,7 +8,9 @@ import _ from 'lodash'; import mime from 'mime-types'; import { Instruction } from '.'; +import { PluginWorkflow } from '..'; import { JOB_STATUS } from '../constants'; +import PluginWorkflowServer from '../Plugin'; import type Processor from '../Processor'; import type { FlowNodeModel } from '../types'; import { toJSON } from '../utils'; @@ -18,128 +20,49 @@ export class CreateInstruction extends Instruction { const { collection, params: { appends = [], ...params } = {} } = node.config; const [dataSourceName, collectionName] = parseCollectionName(collection); - const { repository, filterTargetKey } = this.workflow.app.dataSourceManager.dataSources - .get(dataSourceName) - .collectionManager.getCollection(collectionName); + const userId = _.get(processor.getScope(node.id), '$context.user.id', ''); + const token = this.workflow.app.authManager.jwt.sign({ userId }); const options = processor.getParsedValue(params, node.id); + const origin = Gateway.getInstance().runAtLoop; + const transaction = this.workflow.useDataSourceTransaction(dataSourceName, processor.transaction); + const app = this.workflow.app; + const plugin = app.pm.get(PluginWorkflow); - const c = this.workflow.app.dataSourceManager.dataSources + const { repository, filterTargetKey } = this.workflow.app.dataSourceManager.dataSources .get(dataSourceName) .collectionManager.getCollection(collectionName); - const fields = c.getFields(); - const fieldNames = Object.keys(params.values); - const includesFields = fields.filter((field) => fieldNames.includes(field.options.name)); - - const userId = _.get(processor.getScope(node.id), '$context.user.id', ''); - const token = this.workflow.app.authManager.jwt.sign({ userId }); - const isJSON = (str) => { - try { - return JSON.parse(str); - } catch (e) { - return false; - } + const context = { + stack: Array.from(new Set((processor.execution.context.stack ?? []).concat(processor.execution.id))), }; - //目前可处理url,json对象,base64 - const handleResource = async (resource) => { - const parseRes = isJSON(resource); - const config: AxiosRequestConfig = { - method: 'get', - url: resource, - responseType: 'stream', - }; - const form = new FormData(); - if (resource.startsWith('data:')) { - const matches = resource.match(/^data:(.+);base64,(.+)$/); - if (matches) { - const contentType = matches[1]; - const base64Data = matches[2]; - const buffer = Buffer.from(base64Data, 'base64'); - const ext = mime.extension(contentType); - const filename = `${uid()}.${ext}`; - form.append('file', buffer, { - filename, - contentType, - }); - } else { - throw new Error('Invalid data URL format'); - } - } else if (parseRes) { - const { url: resourceUrl, params: resourceParams, headers: resourceHeaders, body: resourceBody } = parseRes; - config.url = resourceUrl; - config.params = resourceParams; - config.headers = resourceHeaders; - if (resourceHeaders['content-type'] === 'multipart/form-data') { - const formData = new FormData(); - Object.entries(resourceBody).forEach(([key, value]) => { - formData.append(key, value); - }); - config.data = formData; - } else { - config.data = resourceBody; - } - const response = await axios(config); - const contentType = response.headers['content-type']; - // 根据 MIME 类型获取文件扩展名 - const ext = mime.extension(contentType); - const filename = `${uid()}.${ext}`; - // 创建 FormData 实例 - form.append('file', response.data, { - filename, - contentType: response.headers['content-type'], - }); - } else { - // 下载指定 URL 的内容 - const response = await axios(config); - // 获取文件的 MIME 类型 - const contentType = response.headers['content-type']; - // 根据 MIME 类型获取文件扩展名 - const ext = mime.extension(contentType); - const filename = `${uid()}.${ext}`; - // 创建 FormData 实例 - form.append('file', response.data, { - filename, - contentType: response.headers['content-type'], - }); - } - // 发送 multipart 请求 - const origin = Gateway.getInstance().runAtLoop; - const uploadResponse = await axios({ - method: 'post', - url: origin + '/api/attachments:create', - data: form, - headers: { - ...form.getHeaders(), - Authorization: 'Bearer ' + token, + let created; + if (node?.config?.useWorker && !transaction && app.worker.available) { + created = await app.worker.callPluginMethod({ + plugin: PluginWorkflowServer, + method: 'workerWorkflowCreate', + params: { + dataSourceName, + collectionName, + origin, + token, + options, + context, + transaction, }, }); - return uploadResponse.data.data; - }; - - // 处理文件类型 - for (const attachmentField of includesFields) { - if (attachmentField.options.interface === 'attachment') { - const urls = options.values[attachmentField.options.name]; - if (Array.isArray(urls)) { - for (const i in urls) { - urls[i] = await handleResource(urls[i]); - } - } else { - const url = options.values[attachmentField.options.name]; - options.values[attachmentField.options.name] = await handleResource(url); - } - } + } else { + created = await plugin.workerWorkflowCreate({ + dataSourceName, + collectionName, + origin, + token, + options, + context, + transaction, + }); } - const created = await repository.create({ - ...options, - context: { - stack: Array.from(new Set((processor.execution.context.stack ?? []).concat(processor.execution.id))), - }, - transaction, - }); - let result = created; if (created && appends.length) { const includeFields = appends.reduce((set, field) => { diff --git a/packages/module-workflow/src/server/instructions/UpdateInstruction.ts b/packages/module-workflow/src/server/instructions/UpdateInstruction.ts index 6f8764abb7..cc093c751b 100644 --- a/packages/module-workflow/src/server/instructions/UpdateInstruction.ts +++ b/packages/module-workflow/src/server/instructions/UpdateInstruction.ts @@ -1,15 +1,10 @@ -import fs from 'fs'; -import { Readable } from 'stream'; import { parseCollectionName } from '@tachybase/data-source'; import { Gateway } from '@tachybase/server'; -import { uid } from '@tachybase/utils'; -import axios, { AxiosRequestConfig } from 'axios'; -import FormData from 'form-data'; import _ from 'lodash'; -import mime from 'mime-types'; import { Instruction } from '.'; +import PluginWorkflowServer from '..'; import { JOB_STATUS } from '../constants'; import type Processor from '../Processor'; import type { FlowNodeModel } from '../types'; @@ -19,131 +14,49 @@ export class UpdateInstruction extends Instruction { const { collection, params = {} } = node.config; const [dataSourceName, collectionName] = parseCollectionName(collection); - - const { repository } = this.workflow.app.dataSourceManager.dataSources - .get(dataSourceName) - .collectionManager.getCollection(collectionName); const options = processor.getParsedValue(params, node.id); - const c = this.workflow.app.dataSourceManager.dataSources - .get(dataSourceName) - .collectionManager.getCollection(collectionName); - const fields = c.getFields(); - const fieldNames = Object.keys(params.values); - const includesFields = fields.filter((field) => fieldNames.includes(field.options.name)); - const userId = _.get(processor.getScope(node.id), '$context.user.id', ''); const token = this.workflow.app.authManager.jwt.sign({ userId }); - const isJSON = (str) => { - try { - return JSON.parse(str); - } catch (e) { - return false; - } + const transaction = this.workflow.useDataSourceTransaction(dataSourceName, processor.transaction); + + const app = this.workflow.app; + const plugin = app.pm.get(PluginWorkflowServer); + const context = { + stack: Array.from(new Set((processor.execution.context.stack ?? []).concat(processor.execution.id))), }; - //目前可处理url,json对象,base64 - const handleResource = async (resource) => { - const parseRes = isJSON(resource); - const config: AxiosRequestConfig = { - method: 'get', - url: resource, - responseType: 'stream', - }; - const form = new FormData(); - if (resource.startsWith('data:')) { - const matches = resource.match(/^data:(.+);base64,(.+)$/); - if (matches) { - const contentType = matches[1]; - const base64Data = matches[2]; - const buffer = Buffer.from(base64Data, 'base64'); - const ext = mime.extension(contentType); - const filename = `${uid()}.${ext}`; + const origin = Gateway.getInstance().runAtLoop; - form.append('file', buffer, { - filename, - contentType, - }); - } else { - throw new Error('Invalid data URL format'); - } - } else if (parseRes) { - const { url: resourceUrl, params: resourceParams, headers: resourceHeaders, body: resourceBody } = parseRes; - config.url = resourceUrl; - config.params = resourceParams; - config.headers = resourceHeaders; - if (resourceHeaders['content-type'] === 'multipart/form-data') { - const formData = new FormData(); - Object.entries(resourceBody).forEach(([key, value]) => { - formData.append(key, value); - }); - config.data = formData; - } else { - config.data = resourceBody; - } - const response = await axios(config); - const contentType = response.headers['content-type']; - // 根据 MIME 类型获取文件扩展名 - const ext = mime.extension(contentType); - const filename = `${uid()}.${ext}`; - // 创建 FormData 实例 - form.append('file', response.data, { - filename, - contentType: response.headers['content-type'], - }); - } else { - // 下载指定 URL 的内容 - const response = await axios(config); - // 获取文件的 MIME 类型 - const contentType = response.headers['content-type']; - // 根据 MIME 类型获取文件扩展名 - const ext = mime.extension(contentType); - const filename = `${uid()}.${ext}`; - // 创建 FormData 实例 - form.append('file', response.data, { - filename, - contentType: response.headers['content-type'], - }); - } - // 发送 multipart 请求 - const origin = Gateway.getInstance().runAtLoop; - const uploadResponse = await axios({ - method: 'post', - url: origin + '/api/attachments:create', - data: form, - headers: { - ...form.getHeaders(), - Authorization: 'Bearer ' + token, + let result; + if (node?.config?.useWorker && !transaction && app.worker.available) { + result = await app.worker.callPluginMethod({ + plugin: PluginWorkflowServer, + method: 'workerWorkflowUpdate', + params: { + dataSourceName, + collectionName, + origin, + token, + options, + context, + transaction, }, }); - return uploadResponse.data.data; - }; - - // 处理文件类型 - for (const attachmentField of includesFields) { - if (attachmentField.options.interface === 'attachment') { - const urls = options.values[attachmentField.options.name]; - if (Array.isArray(urls)) { - for (const i in urls) { - urls[i] = await handleResource(urls[i]); - } - } else { - const url = options.values[attachmentField.options.name]; - options.values[attachmentField.options.name] = [await handleResource(url)]; - } - } + } else { + result = await plugin.workerWorkflowUpdate({ + dataSourceName, + collectionName, + origin, + token, + options, + context, + transaction, + }); } - const result = await repository.update({ - ...options, - context: { - stack: Array.from(new Set((processor.execution.context.stack ?? []).concat(processor.execution.id))), - }, - transaction: this.workflow.useDataSourceTransaction(dataSourceName, processor.transaction), - }); - return { - result: result.length ?? result, + result, status: JOB_STATUS.RESOLVED, }; }