-
Notifications
You must be signed in to change notification settings - Fork 26
fix(python): improve taint receiver resolution #126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,7 +38,7 @@ const CheckerManager = require('../../common/checker-manager') | |
| const BasicRuleHandler = require('../../../../checker/common/rules-basic-handler') | ||
| const Parser = require('../../../parser/parser') | ||
| const { | ||
| ValueUtil: { Scoped, PrimitiveValue, UndefinedValue, UnionValue, SymbolValue, VoidValue }, | ||
| ValueUtil: { ObjectValue, Scoped, PrimitiveValue, UndefinedValue, UnionValue, SymbolValue, VoidValue }, | ||
| } = require('../../../util/value-util') | ||
| const logger: import('../../../../util/logger').Logger = require('../../../../util/logger')(__filename) | ||
| const Config = require('../../../../config') | ||
|
|
@@ -178,8 +178,10 @@ class PythonAnalyzer extends Analyzer { | |
|
|
||
| const fileFullPath = assembleFullPath(entryPoint.filePath, Config.maindir) | ||
| const sourceNameList = getSourceNameList() | ||
| const fileEntry = this.topScope.context.files[fileFullPath] | ||
| const fileUuid = typeof fileEntry === 'string' ? fileEntry : fileEntry?.uuid | ||
| this.refreshCtx(this.topScope.context.modules.members.get(fileFullPath)?.value, sourceNameList) | ||
| this.refreshCtx(this.symbolTable.get(this.topScope.context.files[fileFullPath])?.value, sourceNameList) | ||
| this.refreshCtx(this.symbolTable.get(fileUuid)?.value, sourceNameList) | ||
| this.refreshCtx(this.topScope.context.packages.members.get(fileFullPath), sourceNameList) | ||
|
|
||
| const { filePath } = entryPoint | ||
|
|
@@ -219,8 +221,10 @@ class PythonAnalyzer extends Analyzer { | |
| ) | ||
| const fileFullPath = assembleFullPath(entryPoint.filePath, Config.maindir) | ||
| const sourceNameList = getSourceNameList() | ||
| const fileEntry = this.topScope.context.files[fileFullPath] | ||
| const fileUuid = typeof fileEntry === 'string' ? fileEntry : fileEntry?.uuid | ||
| this.refreshCtx(this.topScope.context.modules.members.get(fileFullPath)?.value, sourceNameList) | ||
| this.refreshCtx(this.symbolTable.get(this.topScope.context.files[fileFullPath])?.value, sourceNameList) | ||
| this.refreshCtx(this.symbolTable.get(fileUuid)?.value, sourceNameList) | ||
| this.refreshCtx(this.topScope.context.packages.members.get(fileFullPath), sourceNameList) | ||
|
|
||
| this.checkerManager.checkAtSymbolInterpretOfEntryPointBefore(this, null, null, null, null) | ||
|
|
@@ -320,6 +324,117 @@ class PythonAnalyzer extends Analyzer { | |
| return result | ||
| } | ||
|
|
||
| isPythonTypeFactoryInstantiation(node: CallExpression): boolean { | ||
| const innerCall: any = node?.callee | ||
| const innerCallee = innerCall?.callee | ||
| return ( | ||
| innerCall?.type === 'CallExpression' && | ||
| innerCallee?.type === 'Identifier' && | ||
| innerCallee?.name === 'type' | ||
| ) | ||
| } | ||
|
|
||
| buildPythonTypeFactoryObject(scope: Scope, node: CallExpression): Value { | ||
| const innerCall: any = node.callee | ||
| const nameArg = innerCall?.arguments?.[0] | ||
| const className = | ||
| nameArg?.value ?? | ||
| nameArg?.raw_value ?? | ||
| nameArg?.raw ?? | ||
| nameArg?.name ?? | ||
| `type_factory_${node.loc?.start?.line ?? 'unknown'}_${node.loc?.start?.column ?? 'unknown'}` | ||
| const obj = new ObjectValue(scope.qid, { | ||
| sid: String(className), | ||
| qid: `${scope.qid}.${String(className)}`, | ||
| parent: scope, | ||
| ast: node, | ||
| }) as Value | ||
| ;(obj as any)._this = obj | ||
| return obj | ||
| } | ||
|
|
||
| recoverFunctionReceiverFromQid(fclos: any): void { | ||
| if (!fclos || fclos.vtype !== 'fclos') return | ||
| const currentThis = fclos._this || fclos.getThisObj?.() | ||
| if (currentThis && currentThis !== this.topScope && currentThis?.sid !== '<global>') return | ||
| const qid = typeof fclos.qid === 'string' ? fclos.qid : '' | ||
| const lastDot = qid.lastIndexOf('.') | ||
| if (lastDot <= 0) return | ||
| const receiver = this.resolveByQidFromSymbolTable(qid.substring(0, lastDot)) | ||
| if (receiver) { | ||
| fclos._this = receiver | ||
| } | ||
| } | ||
|
|
||
| recoverMemberCallClosure( | ||
| scope: Scope, | ||
| node: CallExpression, | ||
| state: State, | ||
| fclos: any, | ||
| callInfo: CallInfo | ||
| ): any { | ||
| if (node?.callee?.type !== 'MemberAccess') return null | ||
| const member: any = node.callee | ||
| const prop = member.property | ||
| if (!prop) return null | ||
|
|
||
| const receiver = this.processInstruction(scope, member.object, state) | ||
| const candidates = [ | ||
| receiver, | ||
| this.resolveByQidFromSymbolTable(receiver?.qid), | ||
| this.resolveByQidFromSymbolTable(receiver?.sid), | ||
| ].filter(Boolean) | ||
|
|
||
| for (const recv of candidates) { | ||
| const method = this.getMemberValue(recv, prop, state) | ||
| if (method?.vtype === 'fclos') { | ||
| method._this = recv | ||
| if (callInfo?.callArgs) callInfo.callArgs.receiver = recv | ||
| return method | ||
| } | ||
| } | ||
|
|
||
| if (fclos?.vtype === 'fclos' && fclos._this && fclos._this !== this.topScope) { | ||
| if (callInfo?.callArgs) callInfo.callArgs.receiver = fclos._this | ||
| return fclos | ||
| } | ||
|
|
||
| return null | ||
| } | ||
|
|
||
| resolveByQidFromSymbolTable(qid: any): any { | ||
| if (!qid || typeof qid !== 'string') return null | ||
| const symbolMap = this.symbolTable?.getMap?.() | ||
| if (!symbolMap) return null | ||
|
|
||
| for (const val of symbolMap.values()) { | ||
| if (val?.qid === qid || val?.sid === qid) return val | ||
| } | ||
|
|
||
| const parts = qid.split('.').filter(Boolean) | ||
| for (let i = parts.length - 1; i > 0; i--) { | ||
| const baseQid = parts.slice(0, i).join('.') | ||
| let current: any = null | ||
| for (const val of symbolMap.values()) { | ||
| if (val?.qid === baseQid || val?.sid === baseQid) { | ||
| current = val | ||
| break | ||
| } | ||
| } | ||
| if (!current) continue | ||
| for (const fieldName of parts.slice(i)) { | ||
| current = | ||
| current?.value?.[fieldName] || | ||
| current?.members?.get?.(fieldName) || | ||
| current?.getFieldValue?.(fieldName, false) | ||
| if (!current) break | ||
| } | ||
| if (current) return current | ||
| } | ||
|
|
||
| return null | ||
| } | ||
|
Comment on lines
+405
to
+436
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The function |
||
|
|
||
| /** | ||
| * | ||
| * @param scope | ||
|
|
@@ -333,8 +448,13 @@ class PythonAnalyzer extends Analyzer { | |
| einfo: state.einfo, | ||
| }) | ||
|
|
||
| const fclos = this.processInstruction(scope, node.callee, state) | ||
| if (this.isPythonTypeFactoryInstantiation(node)) { | ||
| return this.buildPythonTypeFactoryObject(scope, node) | ||
| } | ||
|
|
||
| let fclos = this.processInstruction(scope, node.callee, state) | ||
| if (!fclos) return new UndefinedValue() | ||
| this.recoverFunctionReceiverFromQid(fclos) | ||
|
|
||
| const argvalues: any[] = [] | ||
| // 参数按原始顺序处理,由 buildPythonCallArgs 标记 kind,bindCallArgs 负责绑定 | ||
|
|
@@ -349,6 +469,10 @@ class PythonAnalyzer extends Analyzer { | |
|
|
||
| // 构建结构化 callInfo,携带 keyword/spread/kwspread 信息 | ||
| const callInfo: CallInfo = { callArgs: this.buildPythonCallArgs(collectedArgs, argvalues, fclos, node) } | ||
| const recoveredFclos = this.recoverMemberCallClosure(scope, node, state, fclos, callInfo) | ||
| if (recoveredFclos) { | ||
| fclos = recoveredFclos | ||
| } | ||
|
|
||
| if (argvalues && this.checkerManager) { | ||
| this.checkerManager.checkAtFunctionCallBefore(this, scope, node, state, { | ||
|
|
@@ -481,10 +605,14 @@ class PythonAnalyzer extends Analyzer { | |
| argvalues: Value[], | ||
| callInfo: CallInfo | ||
| ): Value { | ||
| // 有 __init__ 或 __new__:走完整 buildNewObject(执行构造函数) | ||
| // 不含 fclos.ast?.cdef 条件——无 __init__ 的类走 processLibArgToRet 避免 OOM | ||
| if (fclos.members?.has('_CTOR_') || fclos.value?.['__new__']) { | ||
| const res = this.buildNewObject(fclos.ast.cdef, fclos, state, node, scope, callInfo) | ||
| const classAst: any = fclos?.ast?.cdef || fclos?.ast?.fdef || fclos?.ast | ||
| const isPythonClass = fclos?.vtype === 'class' && classAst?.type === 'ClassDefinition' | ||
|
|
||
| // Python classes without explicit __init__ still need an instance object. | ||
| // Falling back to processLibArgToRet() drops class members/methods and breaks | ||
| // chained instance-method resolution (for example b = B(); b.predict(...)). | ||
| if (isPythonClass || fclos.members?.has('_CTOR_') || fclos.value?.['__new__']) { | ||
| const res = this.buildNewObject(classAst, fclos, state, node, scope, callInfo) | ||
| if (res && this.checkerManager?.checkAtFunctionCallAfter) { | ||
| this.checkerManager.checkAtFunctionCallAfter(this, scope, node, state, { | ||
| callInfo, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change, which allows suffix matching on member access chains, appears to break an existing test case in
test/rules-basic-handler/test-match-field.ts. The testfsig "c" 不匹配 x().cexpectsmatches(member(call(id('x')), 'c'), 'c')to befalse, but this change will cause it to returntrue. This indicates a potential regression or that the test suite has not been updated to reflect this intentional change in matching logic. Please update the tests to align with this new behavior or reconsider the change if it has unintended consequences.