diff --git a/src/rest-server/src/controllers/v2/job.js b/src/rest-server/src/controllers/v2/job.js index 6f59d3d3..8e5024fb 100644 --- a/src/rest-server/src/controllers/v2/job.js +++ b/src/rest-server/src/controllers/v2/job.js @@ -26,6 +26,48 @@ const { Op } = require('sequelize'); const { userProperty } = require('@pai/config/token'); const userController = require('@pai/controllers/v2/user'); +const retrieveJobInfo = async (frameworkName, jobAttemptId, requestingUser, isAdmin, vcAdmins) => { + let data; + try { + data = await job.get( + frameworkName, + jobAttemptId ? Number(jobAttemptId) : undefined, + ); + } catch (error) { + logger.error(`Error retrieving job: ${error.message}`); + throw createError( + 'Internal Server Error', + 'UnknownError', + `Failed to retrieve job ${frameworkName}.`, + ); + } + + if (data && data.jobStatus) { + const virtualCluster = data.jobStatus.virtualCluster; + const userName = data.jobStatus.username; + logger.info(`Job belongs to user: ${userName}, virtual cluster: ${virtualCluster}`); + + const isAdminOfVC = vcAdmins.includes(virtualCluster); + + if (userName !== requestingUser && !isAdmin && !isAdminOfVC) { + logger.warn(`User ${requestingUser} is not allowed to access job ${frameworkName}`); + throw createError( + 'Forbidden', + 'ForbiddenUserError', + `User ${requestingUser} is not allowed to access job ${frameworkName}.`, + ); + } + } else { + throw createError( + 'Not Found', + 'NoJobStatusError', + `Job status for ${frameworkName} is not found.`, + ); + } + logger.info(`Job ${frameworkName} retrieved successfully.`); + return data; +}; + const list = asyncHandler(async (req, res) => { // ?keyword=&username=,&vc=, // &state=,&offset=&limit=&withTotalCount=true @@ -50,7 +92,14 @@ const list = asyncHandler(async (req, res) => { } const username = req[userProperty].username; - myvcs = await userController.getUserVCs(username); + const currentvcs = await userController.getUserVCs(username); + let myvcs = await userController.getUserHistoryVCsFromUserInfo(username); + if (!myvcs || myvcs.length === 0) { + myvcs = [...currentvcs]; + } else { + myvcs = Array.from(new Set([...myvcs, ...currentvcs])); + } + if (!filters.virtualCluster || filters.virtualCluster.length === 0) { filters.virtualCluster = myvcs; } else { @@ -59,6 +108,34 @@ const list = asyncHandler(async (req, res) => { ); } + // if the user tries to access other users' jobs in his/her history VCs, + // we will block the request here + // if filtering on username is not applied, which means accessing all users' jobs, + // or filtering on multiple usernames besides of himself/herself, + // we need to do the checking + const userFilterChecking = !filters.userName || filters.userName.some((name) => name !== username); + + if (userFilterChecking) { + const isAdmin = req[userProperty].admin; + // for admin user, no need to check + if (!isAdmin) { + if (filters.virtualCluster && filters.virtualCluster.length > 0) { + // now check if the vc list only contains user's current VCs + // vc filter is already applied above, so we don't need to check empty case here + const otherVcs = filters.virtualCluster.filter( + (vc) => !currentvcs.includes(vc), + ); + if (otherVcs.length > 0) { + throw createError( + 'Forbidden', + 'ForbiddenUserError', + `User ${username} is not allowed to access other users' jobs in ${otherVcs.join(', ')}.`, + ); + } + } + } + } + if ('state' in req.query) { filters.state = req.query.state.split(','); } @@ -188,9 +265,12 @@ const list = asyncHandler(async (req, res) => { }); const get = asyncHandler(async (req, res) => { - const data = await job.get( + const data = await retrieveJobInfo( req.params.frameworkName, req.params.jobAttemptId ? Number(req.params.jobAttemptId) : undefined, + req[userProperty].username, + req[userProperty].admin, + req[userProperty].vcadmins || [], ); res.json(data); }); @@ -264,6 +344,14 @@ const execute = asyncHandler(async (req, res) => { }); const getConfig = asyncHandler(async (req, res) => { + await retrieveJobInfo( + req.params.frameworkName, + undefined, + req[userProperty].username, + req[userProperty].admin, + req[userProperty].vcadmins || [], + ); + try { const data = await job.getConfig(req.params.frameworkName); return res.status(200).type('text/yaml').send(data); @@ -322,6 +410,14 @@ const deleteTag = asyncHandler(async (req, res) => { }); const getEvents = asyncHandler(async (req, res) => { + await retrieveJobInfo( + req.params.frameworkName, + undefined, + req[userProperty].username, + req[userProperty].admin, + req[userProperty].vcadmins || [], + ); + const filters = {}; if (req.query) { if ('type' in req.query) { @@ -353,6 +449,14 @@ const getEvents = asyncHandler(async (req, res) => { }); const getLogs = asyncHandler(async (req, res) => { + await retrieveJobInfo( + req.params.frameworkName, + undefined, + req[userProperty].username, + req[userProperty].admin, + req[userProperty].vcadmins || [], + ); + try { const data = await log.getLogListFromLogServer( req.params.frameworkName, diff --git a/src/rest-server/src/controllers/v2/user.js b/src/rest-server/src/controllers/v2/user.js index 4d887e7c..682cc194 100644 --- a/src/rest-server/src/controllers/v2/user.js +++ b/src/rest-server/src/controllers/v2/user.js @@ -35,6 +35,11 @@ const getUserVCs = async (username) => { return [...virtualClusters]; }; +const getUserHistoryVCsFromUserInfo = async (username) => { + const userInfo = await userModel.getUser(username); + return userInfo.history_vclist || []; +}; + const getUser = async (req, res, next) => { try { const username = req.params.username; @@ -134,6 +139,7 @@ const createUserIfUserNotExist = async (req, res, next) => { password: userData.oid, grouplist: grouplist, extension: {}, + history_vclist: [], }; const existUser = await userModel.getUser(username).catch(() => null); @@ -810,4 +816,5 @@ module.exports = { updateUserPassword, createUser, getUserVCs, + getUserHistoryVCsFromUserInfo, }; diff --git a/src/rest-server/src/models/v2/job/k8s.js b/src/rest-server/src/models/v2/job/k8s.js index 86ad75f8..eb37c5ab 100644 --- a/src/rest-server/src/models/v2/job/k8s.js +++ b/src/rest-server/src/models/v2/job/k8s.js @@ -1556,6 +1556,33 @@ const getEvents = async (frameworkName, attributes, filters) => { } }; +const listVCsFromJob = async (username) => { + try { + logger.info(`Start to list jobs for user ${username}`); + // Remove limit: 0 so it fetches all records, and optionally add a sensible order + const frameworks = await databaseModel.Framework.findAll({ + attributes: [ + 'virtualCluster', + ], + where: { userName: username }, + }); + + const vcsSet = new Set(); + frameworks.forEach((framework) => { + if (framework.virtualCluster) { + vcsSet.add(framework.virtualCluster); + } + }); + const vcs = Array.from(vcsSet); + + logger.info(`User ${username} has accessed historical virtual clusters: ${vcs}`); + return vcs; + } catch (error) { + logger.error(`Failed to get historical virtual clusters for user ${username}: ${error}`); + return []; + } +}; + // module exports module.exports = { list, @@ -1567,4 +1594,5 @@ module.exports = { addTag, deleteTag, getEvents, + listVCsFromJob, }; diff --git a/src/rest-server/src/utils/manager/user/crudK8sSecret.js b/src/rest-server/src/utils/manager/user/crudK8sSecret.js index 19454c15..07ed4532 100644 --- a/src/rest-server/src/utils/manager/user/crudK8sSecret.js +++ b/src/rest-server/src/utils/manager/user/crudK8sSecret.js @@ -17,8 +17,10 @@ const User = require('./user'); const logger = require('@pai/config/logger'); +const groupModel = require('@pai/models/v2/group'); const k8sModel = require('@pai/models/kubernetes/kubernetes'); const { Mutex } = require('async-mutex'); +const { job } = require('@pai/models/v2/job'); const USER_NAMESPACE = process.env.PAI_USER_NAMESPACE || 'pai-user-v2'; @@ -41,6 +43,36 @@ const cache = new Map(); const readMutex = new Mutex(); +async function getHistoryVCs(name, grouplist, retrieveFromHistory=true) { + // Retrieve VC list from the user's job history + let vcsFromJob = []; + if (retrieveFromHistory) { + logger.info(`Retrieving VC list from job history for user: ${name}`); + vcsFromJob = await job.listVCsFromJob(name); + } + + // Retrieve VC list from each group the user belongs to + const vcSet = new Set(); + const vcPromises = grouplist.map(async group => { + try { + return await groupModel.getGroupVCs(group); + } catch (error) { + logger.error(`Failed to fetch VCs for group ${group}:`, error); + return []; // Return an empty array on failure + } + }); + const vcResults = await Promise.all(vcPromises); + vcResults.forEach(vcs => vcs.forEach(vc => vcSet.add(vc))); + + // Merge VC lists and remove duplicates + const mergedVCList = new Set([ + ...vcsFromJob, + ...Array.from(vcSet), + ]); + + return Array.from(mergedVCList); +} + async function read(key) { if (cache.has(key)) { logger.info(`Read user info from cache: ${key}`); @@ -86,6 +118,9 @@ async function read(key) { extension: JSON.parse( Buffer.from(userData.data.extension, 'base64').toString(), ), + history_vclist: userData.data.history_vclist + ? JSON.parse(Buffer.from(userData.data.history_vclist, 'base64').toString()) + : [], }); cache.set(key, userInstance); @@ -153,6 +188,9 @@ async function readAll() { extension: JSON.parse( Buffer.from(item.data.extension, 'base64').toString(), ), + history_vclist: item.data.history_vclist + ? JSON.parse(Buffer.from(item.data.history_vclist, 'base64').toString()) + : [], }); allUserInstance.push(userInstance); } catch (error) { @@ -194,8 +232,13 @@ async function create(key, value) { grouplist: value.grouplist, email: value.email, extension: value.extension, + history_vclist: [], }); await User.encryptUserPassword(userInstance); + + // retrieve VC list from the history job list belonging to the user if exists + userInstance.history_vclist = await getHistoryVCs(userInstance.username, userInstance.grouplist); + const userData = { metadata: { name: hexKey }, type: 'Opaque', @@ -209,6 +252,7 @@ async function create(key, value) { extension: Buffer.from(JSON.stringify(userInstance.extension)).toString( 'base64', ), + history_vclist: Buffer.from(JSON.stringify(userInstance.history_vclist)).toString('base64'), }, }; const logId = Math.floor(Math.random() * 100000); @@ -252,10 +296,26 @@ async function update(key, value, updatePassword = false) { grouplist: value.grouplist, email: value.email, extension: value.extension, + history_vclist: value.history_vclist || [], }); if (updatePassword) { await User.encryptUserPassword(userInstance); } + + // if userInstance.history_vclist is empty, set it to the retrieved VC list + // retrieve VC list from the job list belonging to the user + const vclist = await getHistoryVCs( + userInstance.username, + userInstance.grouplist, + userInstance.history_vclist.length === 0 + ); + + // Merge userInstance.history_vclist with vclist and remove duplicates + userInstance.history_vclist = Array.from(new Set([ + ...(userInstance.history_vclist || []), + ...(vclist || []), + ])); + const userData = { metadata: { name: hexKey }, data: { @@ -268,6 +328,7 @@ async function update(key, value, updatePassword = false) { extension: Buffer.from(JSON.stringify(userInstance.extension)).toString( 'base64', ), + history_vclist: Buffer.from(JSON.stringify(userInstance.history_vclist)).toString('base64'), }, }; const logId = Math.floor(Math.random() * 100000); diff --git a/src/rest-server/src/utils/manager/user/user.js b/src/rest-server/src/utils/manager/user/user.js index 86c6c609..7fee7953 100644 --- a/src/rest-server/src/utils/manager/user/user.js +++ b/src/rest-server/src/utils/manager/user/user.js @@ -27,6 +27,7 @@ const userSchema = Joi.object() grouplist: Joi.array().items(Joi.string()).required(), password: Joi.string().empty('').default(''), extension: Joi.object().pattern(/\w+/, Joi.required()).required(), + history_vclist: Joi.array().items(Joi.string()), }) .required();