@@ -109,7 +109,11 @@ sge_clear_granted_resources(lListElem *jep, lListElem *ja_task, int incslots,
109109 monitoring_t *monitor, u_long64 gdi_session);
110110
111111static void
112- reduce_queue_limit (const lList *master_centry_list, lListElem *qep, lListElem *jep, int nm, const char *rlimit_name);
112+ reduce_queue_limits (const lList *master_centry_list, const lListElem *gdil_ep, lListElem *qep,
113+ const lListElem *job, const lListElem *ja_task, bool is_pe_job, bool is_first_gdil_entry);
114+ static void
115+ reduce_queue_limit (const lList *master_centry_list, lListElem *qep, const lListElem *job,
116+ bool is_pe_job, bool is_first_gdil_entry, bool master_only, int nm, const char *rlimit_name);
113117
114118static void
115119release_successor_jobs (const lListElem *jep, u_long64 gdi_session);
@@ -204,17 +208,26 @@ sge_give_job(lListElem *jep, lListElem *jatep, const lListElem *master_qep, lLis
204208 DPRINTF (" execd host: %s\n " , rhost);
205209
206210 switch (send_slave_jobs (jep, jatep, monitor, gdi_session)) {
207- case -1 :
211+ case -1 :
208212 ret = -1 ;
209- case 0 :
210213 sent_slaves = 1 ;
211214 break ;
212- case 1 :
215+ case 0 :
216+ sent_slaves = 1 ;
217+ break ;
218+ case 1 :
219+ // Either it is a sequential/array job or a loosely integrated parallel job:
220+ // We may send the master task immediately.
221+ // Or it is a tightly integrated parallel job and all slave hosts have acknowledged the job start.
222+ // Then we may send the master task.
213223 sent_slaves = 0 ;
214224 break ;
215- default :
216- DPRINTF (" send_slave_jobs returned an unknown error code\n " );
225+ default :
226+ DPRINTF (" send_slave_jobs returned an unknown error code\n " );
227+ // @todo sent_slaves is initialized to 0, so we will send the job!
228+ // Can not really happen as send_slave_jobs() only returns -1, 0, or 1.
217229 ret = -1 ;
230+ break ;
218231 }
219232
220233 if (!sent_slaves) {
@@ -267,7 +280,7 @@ send_slave_jobs(lListElem *jep, lListElem *jatep, monitoring_t *monitor, u_long6
267280
268281 DENTER (TOP_LAYER);
269282
270- /* do we have pe slave tasks* */
283+ // do we still have pe slave tasks to be delivered?
271284 for_each_rw (gdil_ep, lGetList (jatep, JAT_granted_destin_identifier_list)) {
272285 if (lGetUlong (gdil_ep, JG_tag_slave_job)) {
273286 lSetUlong (jatep, JAT_next_pe_task_id, 1 );
@@ -279,7 +292,7 @@ send_slave_jobs(lListElem *jep, lListElem *jatep, monitoring_t *monitor, u_long6
279292 DRETURN (1 );
280293 }
281294
282- /* prepare the data to be send. ... */
295+ /* prepare the data to be sent ... */
283296
284297 /* create a copy of the job */
285298 if ((tmpjep = copyJob (jep, jatep)) == nullptr ) {
@@ -305,6 +318,8 @@ send_slave_jobs(lListElem *jep, lListElem *jatep, monitoring_t *monitor, u_long6
305318 lReduceDescr (&rdp, QU_Type, what);
306319
307320 tmpjatep = lFirstRW (lGetList (tmpjep, JB_ja_tasks));
321+ bool is_pe_job = lGetObject (tmpjatep, JAT_pe_object) != nullptr ;
322+ bool is_first_gdil_entry = true ;
308323 for_each_rw (gdil_ep, lGetList (tmpjatep, JAT_granted_destin_identifier_list)) {
309324 const char *src_qname = lGetString (gdil_ep, JG_qname);
310325 const lListElem *src_qep = cqueue_list_locate_qinstance (master_cqueue_list, src_qname);
@@ -318,27 +333,14 @@ send_slave_jobs(lListElem *jep, lListElem *jatep, monitoring_t *monitor, u_long6
318333 */
319334 lSetString (gdil_ep, JG_processors, lGetString (src_qep, QU_processors));
320335
336+ // copy the referenced queue instance
321337 qep = lSelectElemDPack (src_qep, nullptr , rdp, what, false , nullptr , nullptr );
338+ lSetObject (gdil_ep, JG_queue, qep);
322339
323- /* build minimum of job request and queue resource limit */
324- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_s_cpu, " s_cpu" );
325- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_h_cpu, " h_cpu" );
326- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_s_core, " s_core" );
327- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_h_core, " h_core" );
328- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_s_data, " s_data" );
329- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_h_data, " h_data" );
330- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_s_stack, " s_stack" );
331- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_h_stack, " h_stack" );
332- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_s_rss, " s_rss" );
333- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_h_rss, " h_rss" );
334- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_s_fsize, " s_fsize" );
335- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_h_fsize, " h_fsize" );
336- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_s_vmem, " s_vmem" );
337- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_h_vmem, " h_vmem" );
338- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_s_rt, " s_rt" );
339- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_h_rt, " h_rt" );
340+ // build minimum of job request and queue resource limit
341+ reduce_queue_limits (master_centry_list, gdil_ep, qep, tmpjep, tmpjatep, is_pe_job, is_first_gdil_entry);
340342
341- lSetObject (gdil_ep, JG_queue, qep) ;
343+ is_first_gdil_entry = false ;
342344 }
343345
344346 lFreeWhat (&what);
@@ -403,15 +405,17 @@ send_slave_jobs_wc(lListElem *jep, monitoring_t *monitor, u_long64 gdi_session)
403405
404406 next_gdil_ep = lNextRW (gdil_ep);
405407
406- if (!lGetUlong (gdil_ep, JG_tag_slave_job)) {
408+ // when we are re-sending: skip the already acknowledged ones
409+ if (lGetUlong (gdil_ep, JG_tag_slave_job) == 0 ) {
407410 continue ;
408411 }
409412
410- if (!(hep = host_list_locate (master_ehost_list, lGetHost (gdil_ep, JG_qhostname)))) {
413+ hostname = lGetHost (gdil_ep, JG_qhostname);
414+ hep = host_list_locate (master_ehost_list, hostname);
415+ if (hep == nullptr ) {
411416 ret = -1 ;
412417 break ;
413418 }
414- hostname = lGetHost (gdil_ep, JG_qhostname);
415419
416420 if (!simulate_execd) {
417421 /* do ask_commproc() only if we are missing load reports */
@@ -461,7 +465,7 @@ send_slave_jobs_wc(lListElem *jep, monitoring_t *monitor, u_long64 gdi_session)
461465 tgtcclr (jep, hostname);
462466
463467 if (failed != CL_RETVAL_OK) {
464- /* we failed sending the job to the execd */
468+ /* we failed to send the job to the execd */
465469 ERROR (MSG_COM_SENDJOBTOHOST_US, sge_u32c (lGetUlong (jep, JB_job_number)), hostname);
466470 ERROR (" commlib error: %s\n " , cl_get_error_text (failed));
467471 sge_mark_unheard (hep, gdi_session);
@@ -547,6 +551,14 @@ send_job(const char *rhost, lListElem *jep, lListElem *jatep, lListElem *hep, in
547551 what = lIntVector2What (QU_Type, queue_field);
548552 lReduceDescr (&rdp, QU_Type, what);
549553
554+ // @todo CS-1273 YES: We need to deliver the whole GDIL to the master execd (e.g. for building the PE_HOSTFILE)
555+ // BUT: Do we need to do the reduce_job_limit for all queues? Wouldn't only the master queue be enough?
556+ // @todo CS-1273 What happens on the master host of a tightly integrated job?
557+ // We first get the SLAVE container, then the master task.
558+ // So it already has the job, what does it do with the job sent with the master task?
559+ // ==> The data we send here for the master task is simply thrown away, so no need to build it.
560+ bool is_pe_job = lGetObject (tmpjatep, JAT_pe_object) != nullptr ;
561+ bool is_first_gdil_entry = true ;
550562 for_each_rw (gdil_ep, lGetList (tmpjatep, JAT_granted_destin_identifier_list)) {
551563 const char *src_qname = lGetString (gdil_ep, JG_qname);
552564 const lListElem *src_qep = cqueue_list_locate_qinstance (master_cqueue_list, src_qname);
@@ -560,27 +572,14 @@ send_job(const char *rhost, lListElem *jep, lListElem *jatep, lListElem *hep, in
560572 */
561573 lSetString (gdil_ep, JG_processors, lGetString (src_qep, QU_processors));
562574
575+ // copy the referenced queue instance
563576 qep = lSelectElemDPack (src_qep, nullptr , rdp, what, false , nullptr , nullptr );
577+ lSetObject (gdil_ep, JG_queue, qep);
564578
565- /* build minimum of job request and queue resource limit */
566- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_s_cpu, " s_cpu" );
567- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_h_cpu, " h_cpu" );
568- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_s_core, " s_core" );
569- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_h_core, " h_core" );
570- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_s_data, " s_data" );
571- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_h_data, " h_data" );
572- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_s_stack, " s_stack" );
573- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_h_stack, " h_stack" );
574- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_s_rss, " s_rss" );
575- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_h_rss, " h_rss" );
576- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_s_fsize, " s_fsize" );
577- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_h_fsize, " h_fsize" );
578- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_s_vmem, " s_vmem" );
579- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_h_vmem, " h_vmem" );
580- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_s_rt, " s_rt" );
581- reduce_queue_limit (master_centry_list, qep, tmpjep, QU_h_rt, " h_rt" );
579+ // build minimum of job request and queue resource limit
580+ reduce_queue_limits (master_centry_list, gdil_ep, qep, tmpjep, tmpjatep, is_pe_job, is_first_gdil_entry);
582581
583- lSetObject (gdil_ep, JG_queue, qep) ;
582+ is_first_gdil_entry = false ;
584583 }
585584
586585 lFreeWhat (&what);
@@ -1677,6 +1676,64 @@ sge_clear_granted_resources(lListElem *job, lListElem *ja_task, int incslots, mo
16771676 DRETURN_VOID;
16781677}
16791678
1679+ static const char *
1680+ get_requested_limit (const lListElem *job, const char *limit_name,
1681+ bool is_pe_job, bool is_first_gdil_entry, bool master_only) {
1682+ const char *ret = nullptr ;
1683+
1684+ // if we have requested the limit globally, then use this value
1685+ const lListElem *res = lGetElemStr (job_get_hard_resource_list (job, JRS_SCOPE_GLOBAL), CE_name, limit_name);
1686+ if (res != nullptr ) {
1687+ ret = lGetString (res, CE_stringval);
1688+ } else {
1689+ if (is_pe_job) {
1690+ // it is a tightly integrated pe job, we might have master or slave requests
1691+ if (is_first_gdil_entry) {
1692+ // if this is the first gdil entry (master task)
1693+ // we might have only a master task in this gdil (1 slot, job_is_first_task true) => use only master request
1694+ // or a master task and slave tasks => use maximum of master and slave request
1695+ if (master_only) {
1696+ res = lGetElemStr (job_get_hard_resource_list (job, JRS_SCOPE_MASTER), CE_name, limit_name);
1697+ if (res != nullptr ) {
1698+ ret = lGetString (res, CE_stringval);
1699+ }
1700+ } else {
1701+ // we need to compare the possibly existing master and slave request as double in order to compare them
1702+ double master_request = std::numeric_limits<double >::max ();
1703+ double slave_request = std::numeric_limits<double >::max ();
1704+ const lListElem *master_res = lGetElemStr (job_get_hard_resource_list (job, JRS_SCOPE_MASTER), CE_name, limit_name);
1705+ if (master_res != nullptr ) {
1706+ master_request = lGetDouble (master_res, CE_doubleval);
1707+ }
1708+ const lListElem *slave_res = lGetElemStr (job_get_hard_resource_list (job, JRS_SCOPE_SLAVE), CE_name, limit_name);
1709+ if (slave_res != nullptr ) {
1710+ slave_request = lGetDouble (slave_res, CE_doubleval);
1711+ }
1712+
1713+ // pick the maximum
1714+ if (master_request >= slave_request) {
1715+ if (master_res != nullptr ) {
1716+ ret = lGetString (master_res, CE_stringval);
1717+ }
1718+ } else {
1719+ if (slave_res != nullptr ) {
1720+ ret = lGetString (slave_res, CE_stringval);
1721+ }
1722+ }
1723+ }
1724+ } else {
1725+ // this is a slave host/queue, we might have a slave request
1726+ res = lGetElemStr (job_get_hard_resource_list (job, JRS_SCOPE_SLAVE), CE_name, limit_name);
1727+ if (res != nullptr ) {
1728+ ret = lGetString (res, CE_stringval);
1729+ }
1730+ }
1731+ }
1732+ }
1733+
1734+ return ret;
1735+ }
1736+
16801737/* what we do is:
16811738 if there is a hard request for this rlimit then we replace
16821739 the queue's value by the job request
@@ -1685,36 +1742,68 @@ sge_clear_granted_resources(lListElem *job, lListElem *ja_task, int incslots, mo
16851742*/
16861743
16871744static void
1688- reduce_queue_limit (const lList *master_centry_list, lListElem *qep, lListElem *jep, int nm, const char *rlimit_name) {
1745+ reduce_queue_limit (const lList *master_centry_list, lListElem *qep, const lListElem *job,
1746+ bool is_pe_job, bool is_first_gdil_entry, bool master_only, int nm, const char *rlimit_name) {
16891747 DENTER (BASIS_LAYER);
1690- const char *s;
1691- const lList *master_ehost_list = *ocs::DataStore::get_master_list (SGE_TYPE_EXECHOST);
1692- // @todo CS-612 we also need to look at master or slave requests
1693- // might be tricky for the first gdil entry (qep) as it can contain both master and slave tasks
1694- const lListElem *res = lGetElemStr (job_get_hard_resource_list (jep), CE_name, rlimit_name);
1695-
1696- if ((res != nullptr ) && (s = lGetString (res, CE_stringval))) {
1697- // we know: if the job was scheduled, the job request must have been <= the queue limit
1698- // therefore we can just set the queue limit to the job request
1699- DPRINTF (" job reduces queue limit: %s = %s (was %s)\n " , rlimit_name, s, lGetString (qep, nm));
1700- lSetString (qep, nm, s);
1748+
1749+ const char *request = get_requested_limit (job, rlimit_name, is_pe_job, is_first_gdil_entry, master_only);
1750+ if (request != nullptr ) {
1751+ // we know: if the job was scheduled, the job request must have been <= the queue limit,
1752+ // therefore, we can just set the queue limit to the job request
1753+ DPRINTF (" job reduces queue limit: %s = %s (was %s)\n " , rlimit_name, request, lGetString (qep, nm));
1754+ lSetString (qep, nm, request);
17011755 } else { /* enforce default request if set, but only if the consumable is */
17021756 lListElem *dcep; /* really used to manage resources of this queue, host or globally */
1703- if ((dcep = centry_list_locate (master_centry_list, rlimit_name))
1704- && lGetUlong (dcep, CE_consumable))
1757+ if ((dcep = centry_list_locate (master_centry_list, rlimit_name)) && lGetUlong (dcep, CE_consumable)) {
1758+ const lList *master_ehost_list = * ocs::DataStore::get_master_list (SGE_TYPE_EXECHOST);
17051759 if (lGetSubStr (qep, CE_name, rlimit_name, QU_consumable_config_list) ||
17061760 lGetSubStr (host_list_locate (master_ehost_list, lGetHost (qep, QU_qhostname)), CE_name, rlimit_name,
17071761 EH_consumable_config_list) ||
17081762 lGetSubStr (host_list_locate (master_ehost_list, SGE_GLOBAL_NAME), CE_name, rlimit_name,
1709- EH_consumable_config_list))
1710-
1763+ EH_consumable_config_list)) {
17111764 /* managed at queue level, managed at host level or managed at global level */
17121765 lSetString (qep, nm, lGetString (dcep, CE_defaultval));
1766+ }
1767+ }
17131768 }
17141769
17151770 DRETURN_VOID;
17161771}
17171772
1773+ static void
1774+ reduce_queue_limits (const lList *master_centry_list, const lListElem *gdil_ep, lListElem *qep, const lListElem *job, const lListElem *ja_task,
1775+ bool is_pe_job, bool is_first_gdil_entry) {
1776+ bool only_master_task = true ;
1777+ if (is_pe_job && is_first_gdil_entry) {
1778+ // if it is a tightly integrated pe job and we are looking at the first gdil_ep
1779+ // we have the master task and optionally slave tasks
1780+ // we want to know if we have only the master task in this gdil_ep
1781+ // this is not the case when
1782+ // - we have more than one slot in the queue
1783+ // - pe setting job_is_first_task is false
1784+ const lListElem *pe = lGetObject (ja_task, JAT_pe_object);
1785+ if (lGetUlong (gdil_ep, JG_slots) > 1 ||
1786+ (pe != nullptr && !lGetBool (pe, PE_job_is_first_task))) {
1787+ only_master_task = false ;
1788+ }
1789+ }
1790+ reduce_queue_limit (master_centry_list, qep, job, is_pe_job, is_first_gdil_entry, only_master_task, QU_s_cpu, " s_cpu" );
1791+ reduce_queue_limit (master_centry_list, qep, job, is_pe_job, is_first_gdil_entry, only_master_task, QU_h_cpu, " h_cpu" );
1792+ reduce_queue_limit (master_centry_list, qep, job, is_pe_job, is_first_gdil_entry, only_master_task, QU_s_core, " s_core" );
1793+ reduce_queue_limit (master_centry_list, qep, job, is_pe_job, is_first_gdil_entry, only_master_task, QU_h_core, " h_core" );
1794+ reduce_queue_limit (master_centry_list, qep, job, is_pe_job, is_first_gdil_entry, only_master_task, QU_s_data, " s_data" );
1795+ reduce_queue_limit (master_centry_list, qep, job, is_pe_job, is_first_gdil_entry, only_master_task, QU_h_data, " h_data" );
1796+ reduce_queue_limit (master_centry_list, qep, job, is_pe_job, is_first_gdil_entry, only_master_task, QU_s_stack, " s_stack" );
1797+ reduce_queue_limit (master_centry_list, qep, job, is_pe_job, is_first_gdil_entry, only_master_task, QU_h_stack, " h_stack" );
1798+ reduce_queue_limit (master_centry_list, qep, job, is_pe_job, is_first_gdil_entry, only_master_task, QU_s_rss, " s_rss" );
1799+ reduce_queue_limit (master_centry_list, qep, job, is_pe_job, is_first_gdil_entry, only_master_task, QU_h_rss, " h_rss" );
1800+ reduce_queue_limit (master_centry_list, qep, job, is_pe_job, is_first_gdil_entry, only_master_task, QU_s_fsize, " s_fsize" );
1801+ reduce_queue_limit (master_centry_list, qep, job, is_pe_job, is_first_gdil_entry, only_master_task, QU_h_fsize, " h_fsize" );
1802+ reduce_queue_limit (master_centry_list, qep, job, is_pe_job, is_first_gdil_entry, only_master_task, QU_s_vmem, " s_vmem" );
1803+ reduce_queue_limit (master_centry_list, qep, job, is_pe_job, is_first_gdil_entry, only_master_task, QU_h_vmem, " h_vmem" );
1804+ reduce_queue_limit (master_centry_list, qep, job, is_pe_job, is_first_gdil_entry, only_master_task, QU_s_rt, " s_rt" );
1805+ reduce_queue_limit (master_centry_list, qep, job, is_pe_job, is_first_gdil_entry, only_master_task, QU_h_rt, " h_rt" );
1806+ }
17181807
17191808/* -------------------------------------------------------------------------*/
17201809/* unlink/rename the job specific files on disk, send event to scheduler */
0 commit comments