/*
*         OpenPBS (Portable Batch System) v2.3 Software License
*
* Copyright (c) 1999-2000 Veridian Information Solutions, Inc.
* All rights reserved.
*
* ---------------------------------------------------------------------------
* For a license to use or redistribute the OpenPBS software under conditions
* other than those described below, or to purchase support for this software,
* please contact Veridian Systems, PBS Products Department ("Licensor") at:
*
*    www.OpenPBS.org  +1 650 967-4675                  sales@OpenPBS.org
*                        877 902-4PBS (US toll-free)
* ---------------------------------------------------------------------------
*
* This license covers use of the OpenPBS v2.3 software (the "Software") at
* your site or location, and, for certain users, redistribution of the
* Software to other sites and locations.  Use and redistribution of
* OpenPBS v2.3 in source and binary forms, with or without modification,
* are permitted provided that all of the following conditions are met.
* After December 31, 2001, only conditions 3-6 must be met:
*
* 1. Commercial and/or non-commercial use of the Software is permitted
*    provided a current software registration is on file at www.OpenPBS.org.
*    If use of this software contributes to a publication, product, or
*    service, proper attribution must be given; see www.OpenPBS.org/credit.html
*
* 2. Redistribution in any form is only permitted for non-commercial,
*    non-profit purposes.  There can be no charge for the Software or any
*    software incorporating the Software.  Further, there can be no
*    expectation of revenue generated as a consequence of redistributing
*    the Software.
*
* 3. Any Redistribution of source code must retain the above copyright notice
*    and the acknowledgment contained in paragraph 6, this list of conditions
*    and the disclaimer contained in paragraph 7.
*
* 4. Any Redistribution in binary form must reproduce the above copyright
*    notice and the acknowledgment contained in paragraph 6, this list of
*    conditions and the disclaimer contained in paragraph 7 in the
*    documentation and/or other materials provided with the distribution.
*
* 5. Redistributions in any form must be accompanied by information on how to
*    obtain complete source code for the OpenPBS software and any
*    modifications and/or additions to the OpenPBS software.  The source code
*    must either be included in the distribution or be available for no more
*    than the cost of distribution plus a nominal fee, and all modifications
*    and additions to the Software must be freely redistributable by any party
*    (including Licensor) without restriction.
*
* 6. All advertising materials mentioning features or use of the Software must
*    display the following acknowledgment:
*
*     "This product includes software developed by NASA Ames Research Center,
*     Lawrence Livermore National Laboratory, and Veridian Information
*     Solutions, Inc.
*     Visit www.OpenPBS.org for OpenPBS software support,
*     products, and information."
*
* 7. DISCLAIMER OF WARRANTY
*
* THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT
* ARE EXPRESSLY DISCLAIMED.
*
* IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE
* U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
* OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
* EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* This license will be governed by the laws of the Commonwealth of Virginia,
* without reference to its choice of law rules.
*/

#include <pbs_config.h>   /* the master config generated by configure */
#include "node_manager.h"

#include <string>
#include <sstream>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#include <errno.h>
#include <time.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <stdarg.h>
#include <assert.h>
#if defined(NTOHL_NEEDS_ARPA_INET_H) && defined(HAVE_ARPA_INET_H)
#include <arpa/inet.h>
#endif
#include <vector>

#include "portability.h"
#include "libpbs.h"
#include "server_limits.h"
#include "list_link.h"
#include "attribute.h"
#include "resource.h"
#include "server.h"
#include "net_connect.h"
#include "batch_request.h"
#include "work_task.h"
#include "svrfunc.h"
#include "pbs_job.h"
#include "log.h"
#include "../lib/Liblog/pbs_log.h"
#include "../lib/Liblog/log_event.h"
#include "pbs_nodes.h"
#include "dis.h"
#include "dis_init.h"
#include "resmon.h"
#include "mcom.h"
#include "utils.h"
#include "u_tree.h"
#include "threadpool.h"
#include "node_func.h" /* find_nodebyname */
#include "../lib/Libutils/u_lock_ctl.h" /* lock_node, unlock_node */
#include "../lib/Libnet/lib_net.h" /* socket_read_flush */
#include "svr_func.h" /* get_svr_attr_* */
#include "alps_functions.h"
#include "login_nodes.h"
#include "svr_connect.h" /* svr_disconnect_sock */
#include "net_cache.h"
#include "ji_mutex.h"
#include "alps_constants.h"
#include "mutex_mgr.hpp"

#define IS_VALID_STR(STR)  (((STR) != NULL) && ((STR)[0] != '\0'))

extern int              LOGLEVEL;


#if !defined(H_ERRNO_DECLARED) && !defined(_AIX)
/*extern int              h_errno;*/
#endif
  

int                     svr_totnodes = 0; /* total number nodes defined       */
int                     svr_clnodes  = 0; /* number of cluster nodes     */
int                     svr_chngNodesfile = 0; /* 1 signals want nodes file update */
int                     gpu_mode_rqstd = -1;  /* default gpu mode requested */
int                     gpu_err_reset = FALSE;    /* was a gpu errcount reset requested */
/* on server shutdown, (qmgr mods)  */

all_nodes               allnodes;

static int              num_addrnote_tasks = 0; /* number of outstanding send_cluster_addrs tasks */
pthread_mutex_t        *addrnote_mutex = NULL;

extern int              server_init_type;
extern int              has_nodes;

extern int create_a_gpusubnode(struct pbsnode *);
int        is_gpustat_get(struct pbsnode *np, char **str_ptr);

extern int              ctnodes(char *);

extern char            *path_home;
extern char            *path_nodes;
extern char            *path_nodes_new;
extern char            *path_nodestate;
extern char            *path_nodenote;
extern char            *path_nodenote_new;
extern char             server_name[];

extern struct server    server;
extern tlist_head       svr_newnodes;
extern attribute_def    node_attr_def[];   /* node attributes defs */
extern int              SvrNodeCt;
extern struct all_jobs  alljobs;

extern int              multi_mom;

#define SKIP_NONE       0
#define SKIP_EXCLUSIVE  1
#define SKIP_ANYINUSE   2
#define SKIP_NONE_REUSE 3

#ifndef MAX_BM
#define MAX_BM          64
#endif

int handle_complete_first_time(job *pjob);
int is_compute_node(char *node_id);
int hasprop(struct pbsnode *, struct prop *);
int add_job_to_node(struct pbsnode *,struct pbssubn *,short,job *);
int node_satisfies_request(struct pbsnode *,char *);
int reserve_node(struct pbsnode *,short,job *,char *,struct howl **);
int build_host_list(struct howl **,struct pbssubn *,struct pbsnode *);
int procs_available(int proc_ct);
void check_nodes(struct work_task *);
int gpu_entry_by_id(struct pbsnode *,char *, int);
job *get_job_from_jobinfo(struct jobinfo *,struct pbsnode *);

/* marks a stream as finished being serviced */
pthread_mutex_t        *node_state_mutex = NULL;





/**
**      Modified by Tom Proett <proett@nas.nasa.gov> for PBS.
*/

AvlTree                 ipaddrs = NULL;


/**
 * specialized version of tfind for looking in the ipadders tree
 * @param key - the node we are searching for
 * @return a pointer to the pbsnode
*/

struct pbsnode *tfind_addr(

  const u_long  key,
  uint16_t      port,
  char         *job_momname)

  {
  struct pbsnode *pn = AVL_find(key,port,ipaddrs);

  if (pn == NULL)
    return(NULL);

  lock_node(pn, __func__, "pn", LOGLEVEL);

  if (pn->num_node_boards == 0)
    return(pn);
  else
    {
    char *dash = NULL;
    char *plus = NULL;
    char *tmp = job_momname;

    struct pbsnode *numa = NULL;

    int index;

    plus = strchr(tmp,'+');

    if (plus != NULL)
      *plus = '\0';

    while ((tmp = strchr(tmp,'-')) != NULL)
      {
      dash = tmp;
      tmp++;
      }

    if (dash == NULL)
      {
      /* node has numa nodes but no dashes in exec host?? */
      log_err(-1, __func__, "Numa node but there's no dash in exec_host?");

      return(pn);
      }

    index = atoi(dash+1);

    numa = AVL_find(index, pn->nd_mom_port, pn->node_boards);

    unlock_node(pn, __func__, "pn->numa", LOGLEVEL);
    lock_node(numa, __func__, "numa", LOGLEVEL);

    if (plus != NULL)
      *plus = '+';

    return(numa);
    }
  } /* END tfind_addr() */





/* update_node_state - central location for updating node state */
/* NOTE:  called each time a node is marked down, each time a MOM reports node  */
/*        status, and when pbs_server sends hello/cluster_addrs */

void update_node_state(

  struct pbsnode *np,         /* I (modified) */
  int             newstate)   /* I (one of INUSE_*) */

  {
  char            log_buf[LOCAL_LOG_BUF_SIZE];

  /* No need to do anything if newstate == oldstate */
  if (np->nd_state == newstate)
    return;

  /*
   * LOGLEVEL >= 4 logs all state changes
   *          >= 2 logs down->(busy|free) changes
   *          (busy|free)->down changes are always logged
   */

  if (LOGLEVEL >= 4)
    {
    sprintf(log_buf, "adjusting state for node %s - state=%d, newstate=%d",
      (np->nd_name != NULL) ? np->nd_name : "NULL",
      np->nd_state,
      newstate);

    log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);
    }

  log_buf[0] = '\0';

  if (newstate & INUSE_DOWN)
    {
    if (!(np->nd_state & INUSE_DOWN))
      {
      sprintf(log_buf, "node %s marked down",
        (np->nd_name != NULL) ? np->nd_name : "NULL");

      np->nd_state |= INUSE_DOWN;
      np->nd_state &= ~INUSE_UNKNOWN;
      }

    /* ignoring the obvious possibility of a "down,busy" node */
    }  /* END if (newstate & INUSE_DOWN) */
  else if (newstate & INUSE_BUSY)
    {
    if ((!(np->nd_state & INUSE_BUSY) && (LOGLEVEL >= 4)) ||
        ((np->nd_state & INUSE_DOWN) && (LOGLEVEL >= 2)))
      {
      sprintf(log_buf, "node %s marked busy",
        (np->nd_name != NULL) ? np->nd_name : "NULL");
      }

    np->nd_state |= INUSE_BUSY;

    np->nd_state &= ~INUSE_UNKNOWN;

    if (np->nd_state & INUSE_DOWN)
      {
      np->nd_state &= ~INUSE_DOWN;
      }
    }  /* END else if (newstate & INUSE_BUSY) */
  else if (newstate == INUSE_FREE)
    {
    if (((np->nd_state & INUSE_DOWN) && (LOGLEVEL >= 2)) ||
        ((np->nd_state & INUSE_BUSY) && (LOGLEVEL >= 4)))
      {
      sprintf(log_buf, "node %s marked free",
        (np->nd_name != NULL) ? np->nd_name : "NULL");
      }

    np->nd_state &= ~INUSE_BUSY;

    np->nd_state &= ~INUSE_UNKNOWN;

    if (np->nd_state & INUSE_DOWN)
      {
      np->nd_state &= ~INUSE_DOWN;
      }
    }    /* END else if (newstate == INUSE_FREE) */

  if (newstate & INUSE_UNKNOWN)
    {
    np->nd_state |= INUSE_UNKNOWN;
    }

  if ((LOGLEVEL >= 2) && (log_buf[0] != '\0'))
    {
    log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);
    }

  return;
  }  /* END update_node_state() */




int check_node_for_job(

  struct pbsnode *pnode,
  char           *jobid)

  {
  for (int i = 0; i < (int)pnode->nd_job_usages.size(); i++)
    {
    job_usage_info *jui = pnode->nd_job_usages[i];

    if (!strcmp(jobid, jui->jobid))
      return(TRUE);
    }

  /* not found */
  return(FALSE);
  } /* END check_node_for_job() */



/*
 * record_reported_time()
 *
 * @pre-cond: vp must be a character pointer to a job id
 * @post-cond: the job's last reported time is updated. This is used to safeguard against
 * deleting jobs pre-maturely
 *
 * @param vp - the jobid of the job
 *
 */
void *record_reported_time(

  void *vp)

  {
  char *job_and_node = (char *)vp;

  if (job_and_node != NULL)
    {
    char *jobid = job_and_node;
    char *node_id;
    char *colon = strchr(job_and_node, ':');

    if (colon != NULL)
      {
      *colon = '\0';
      node_id = colon + 1;

      job *pjob = svr_find_job(jobid, TRUE);

      if (pjob != NULL)
        {
        mutex_mgr job_mutex(pjob->ji_mutex, true);

        if (pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str != NULL)
          {
          if (!strncmp(node_id, pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str, strlen(node_id)))
            pjob->ji_last_reported_time = time(NULL);
          }
        }
      }

    free(job_and_node);
    }

  return(NULL);
  } /* END record_reported_time() */




/*
 * is_job_on_node - return TRUE if this jobid is present on pnode
 */

int is_job_on_node(

  struct pbsnode *pnode, /* I */
  char           *jobid) /* I */

  {
  struct pbsnode *numa;

  int             present = FALSE;
  char           *at;
  int             i;

  if ((at = strchr(jobid, (int)'@')) != NULL)
    *at = '\0'; /* strip off @server_name */

  if (pnode->num_node_boards > 0)
    {
    /* check each subnode on each numa node for the job */
    for (i = 0; i < pnode->num_node_boards; i++)
      {
      numa = AVL_find(i,pnode->nd_mom_port,pnode->node_boards);

      lock_node(numa, __func__, "before check_node_for_job numa", LOGLEVEL);
      present = check_node_for_job(pnode,jobid);
      unlock_node(numa, __func__, "after check_node_for_job numa", LOGLEVEL);

      /* leave loop if we found the job */
      if (present != FALSE)
        break;
      } /* END for each numa node */
    }
  else
    {
    present = check_node_for_job(pnode, jobid);
    if (present == TRUE)
      {
      char *job_and_node = (char *)calloc(1, strlen(jobid) + 2 + strlen(pnode->nd_name));
      sprintf(job_and_node, "%s:%s", jobid, pnode->nd_name);
      enqueue_threadpool_request(record_reported_time, job_and_node);
      }
    }

  if (at != NULL)
    *at = '@';  /* restore @server_name */

  return(present);
  }  /* END is_job_on_node() */




/* If nodes have similiar names this will make sure the name is an exact match.
 * Not just found inside another name.
 * i.e. Machines by the name of gpu, gpuati, gpunvidia. If searching for gpu...
 * List format is similiar to: gpuati+gpu/1+gpunvidia/4+gpu/5
 */
int node_in_exechostlist(
    
  char *node_name,
  char *node_ehl)

  {
  int   rc = FALSE;
  char *cur_pos = node_ehl;
  char *new_pos = cur_pos;
  int   name_len = strlen(node_name);

  while (1)
    {
    if ((new_pos = strstr(cur_pos, node_name)) == NULL)
      break;
    else if (new_pos == node_ehl)
      {
      if ((new_pos+name_len == NULL) ||
          (*(new_pos+name_len) == '+') ||
          (*(new_pos+name_len) == '/'))
        {
        rc = TRUE;
        break;
        }
      }
    else if (*(new_pos-1) == '+')
      {
      if ((new_pos+name_len == NULL) ||
          (*(new_pos+name_len) == '+') ||
          (*(new_pos+name_len) == '/'))
        {
        rc = TRUE;
        break;
        }
      }

    cur_pos = new_pos+1;
    }

  return(rc);
  } /* END node_in_exechostlist() */




int kill_job_on_mom(

  char           *jobid,
  struct pbsnode *pnode)

  {
  batch_request *preq;
  int            rc = -1;
  int            conn;
  int            local_errno = 0;
  char           log_buf[LOCAL_LOG_BUF_SIZE];

  /* job is reported by mom but server has no record of job */
  sprintf(log_buf, "stray job %s found on %s", jobid, pnode->nd_name);
  log_err(-1, __func__, log_buf);
  
  conn = svr_connect(pnode->nd_addrs[0], pnode->nd_mom_port, &local_errno, pnode, NULL, ToServerDIS);

  if (conn >= 0)
    {
    if ((preq = alloc_br(PBS_BATCH_SignalJob)) == NULL)
      {
      log_err(-1, __func__, "unable to allocate SignalJob request-trouble!");
      svr_disconnect(conn);
      }
    else
      {
      snprintf(preq->rq_ind.rq_signal.rq_jid, sizeof(preq->rq_ind.rq_signal.rq_jid), "%s", jobid);
      snprintf(preq->rq_ind.rq_signal.rq_signame, sizeof(preq->rq_ind.rq_signal.rq_signame), "SIGKILL");
      preq->rq_extra = strdup(SYNC_KILL);
      
      unlock_node(pnode, __func__, NULL, LOGLEVEL);
      rc = issue_Drequest(conn, preq);
      free_br(preq);
      lock_node(pnode, __func__, NULL, LOGLEVEL);
      }
    }

  return(rc);
  } /* END kill_job_on_mom() */



pthread_mutex_t jobsKilledMutex = PTHREAD_MUTEX_INITIALIZER;
std::vector<std::string> jobsKilled;
#define JOB_SYNC_TIMEOUT 60 //Once a kill job has been sent to a MOM, don't send another for five minutes.



/*
 * Delayed task to remove a killed job from the list in
 * case it needs to be removed again.
 */

void remove_job_from_already_killed_list(struct work_task *pwt)
  {
  std::string *pJobID = (std::string *)pwt->wt_parm1;

  free(pwt->wt_mutex);
  free(pwt);

  if(pJobID == NULL) return;

  pthread_mutex_lock(&jobsKilledMutex);

  for(std::vector<std::string>::iterator i = jobsKilled.begin();i != jobsKilled.end();i++)
    {
    if((*i).compare(*pJobID) == 0)
      {
      jobsKilled.erase(i);
      if(i == jobsKilled.end())
        {
        break;
        }
      }
    }
  pthread_mutex_unlock(&jobsKilledMutex);

  delete pJobID;

  }

/*
 * If a job is not supposed to be on a node and we have
 * not sent a kill to that job in the last 5 minutes
 * then the job should be killed.
 */

bool job_should_be_killed(
    
  char           *jobid,
  struct pbsnode *pnode)

  {
  bool  should_be_on_node = true;
  bool  should_kill_job = false;
  job *pjob;
  
  if (strstr(jobid, server_name) != NULL)
    {
    if ((is_job_on_node(pnode, jobid)) == FALSE)
      {
      /* must lock the job before the node */
      unlock_node(pnode, __func__, NULL, LOGLEVEL);
      pjob = svr_find_job(jobid, TRUE);
      lock_node(pnode, __func__, NULL, LOGLEVEL);
      
      if (pjob != NULL)
        {
        /* job exists, but doesn't currently have resources assigned
         * to this node double check the job struct because we
         * could be in the middle of moving the job around because
         * of data staging, suspend, or rerun */            
        mutex_mgr job_mgr(pjob->ji_mutex,true);
        if (pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str == NULL)
          {
          should_be_on_node = false;
          }
        else if (node_in_exechostlist(pnode->nd_name, pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str) == FALSE)
          {
          should_be_on_node = false;
          }
        }
      else
        should_be_on_node = false;
      }
    }

  if(!should_be_on_node)
    {
    bool jobAlreadyKilled = false;
    //Job should not be on the node, see if we have already sent a kill for this job.
    pthread_mutex_lock(&jobsKilledMutex);

    for(std::vector<std::string>::iterator i = jobsKilled.begin();(i != jobsKilled.end())&&(jobAlreadyKilled == false);i++)
      {
      if((*i).compare(jobid) == 0)
        {
        jobAlreadyKilled = true;
        }
      }
    if(!jobAlreadyKilled)
      {
      should_kill_job = true;
      }
    pthread_mutex_unlock(&jobsKilledMutex);
    }

  return(should_kill_job);
  } /* END job_should_be_on_node() */


void *finish_job(

  void *vp)

  {
  char *jobid = (char *)vp;
  job  *pjob;

  if (jobid == NULL)
    return(NULL);
  else if ((pjob = svr_find_job(jobid, TRUE)) == NULL)
    {
    free(jobid);

    return(NULL);
    }
  mutex_mgr job_mgr(pjob->ji_mutex,true);
  job_mgr.set_lock_on_exit(false);

  free(jobid);

  free_nodes(pjob);

  svr_setjobstate(pjob, JOB_STATE_COMPLETE, JOB_SUBSTATE_COMPLETE, FALSE);

  handle_complete_first_time(pjob);

  return(NULL);
  } /* END finish_job() */




int remove_jobs_that_have_disappeared(

  struct pbsnode  *pnode,
  resizable_array *reported_ms_jobs,
  time_t           timestamp)

  {
  int   iter = -1;
  char  log_buf[LOCAL_LOG_BUF_SIZE];
  char *jobid;
  
  while ((jobid = (char *)pop_thing(pnode->nd_ms_jobs)) != NULL)
    {
    job *pjob;

    /* locking priority is job before node */
    unlock_node(pnode, __func__, NULL, LOGLEVEL);
    pjob = svr_find_job(jobid, TRUE);
    lock_node(pnode, __func__, NULL, LOGLEVEL);

    if (pjob == NULL)
      {
      free(jobid);

      continue;
      }
    mutex_mgr job_mgr(pjob->ji_mutex,true);

    /* 45 seconds is typically the time between intervals for each update 
     * from the mom. Add this in case a stale update is processed and the job
     * hadn't started at the time the update was sent */
    if ((pjob->ji_qs.ji_state >= JOB_STATE_EXITING) ||
        (pjob->ji_qs.ji_substate < JOB_SUBSTATE_RUNNING) ||
        (pjob->ji_wattr[JOB_ATR_start_time].at_val.at_long > timestamp - 45))
      {
      free(jobid);

      job_mgr.unlock();
      continue;
      }

    job_mgr.unlock();
    /* mom didn't report this job - it has exited unexpectedly */
    snprintf(log_buf, sizeof(log_buf),
      "Server thinks job %s is on node %s, but the mom doesn't. Terminating job",
      jobid, pnode->nd_name);
    log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, jobid, log_buf);  

    enqueue_threadpool_request(finish_job, jobid);
    }

  /* now replace the old resizable array with the ones currently reported */
  while ((jobid = (char *)next_thing(reported_ms_jobs, &iter)) != NULL)
    insert_thing(pnode->nd_ms_jobs, jobid);

  return(PBSE_NONE);
  } /* END remove_jobs_that_have_disappeared() */



/*
 * match_exact_jobid()
 */
int match_exact_jobid(

  const char *jobs,
  const char *jobid)

  {
  char *joblist = strdup(jobs);
  char *jobidstr = NULL;

  jobidstr = threadsafe_tokenizer(&joblist, (char *)" ");
  while (jobidstr != NULL)
    {
    if (strcmp(jobid, jobidstr) == 0)
      return(1);
    jobidstr = threadsafe_tokenizer(&joblist, " ");
    }

  return(0);
  }



/*
 * sync_node_jobs_with_moms() - remove any jobs in the pbsnode (np) that was not
 * reported by the mom that it's currently running in its status update.
 */
void sync_node_jobs_with_moms(

  struct pbsnode *np,        /* I */
  const char *jobs_in_mom)   /* I */

  {
  std::vector<std::string> jobsRemoveFromNode;
  int removealljobs = (strlen(jobs_in_mom) == 0);

  for (int i = 0; i < (int)np->nd_job_usages.size(); i++)
    {
    int removejob = 0;
    job_usage_info *jui = np->nd_job_usages[i];
    char *jobid = jui->jobid;

    if (!removealljobs)
      {
      char *p = strstr((char *)jobs_in_mom, jobid);
      /* job is in the node but not in mom */
      if (!p)
        removejob = 1;
      else if (!(match_exact_jobid(jobs_in_mom, jobid)))
        removejob = 1;
      }
    if (removejob || removealljobs)
      {
      unlock_node(np, __func__, NULL, LOGLEVEL);
      job *pjob = svr_find_job(jobid, TRUE);
      lock_node(np, __func__, NULL, LOGLEVEL);
      if (pjob)
        unlock_ji_mutex(pjob, __func__, NULL, LOGLEVEL);
      else
        jobsRemoveFromNode.push_back(std::string (jobid));
      }
    }

  char log_buf[LOCAL_LOG_BUF_SIZE + 1];
  for (std::vector<std::string>::iterator it = jobsRemoveFromNode.begin();
             it != jobsRemoveFromNode.end(); it++)
    {
    snprintf(log_buf, sizeof(log_buf),
      "Job %s was not reported in %s update status. Freeing job from node.", (*it).c_str(), np->nd_name);
    log_event(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf);
    remove_job_from_node(np, (*it).c_str());
    }
  } /* end of sync_node_jobs_with_moms */



/*
 * sync_node_jobs() - determine if a MOM has a stale job and possibly delete it
 *
 * This function is called every time we get a node stat from the pbs_mom.
 *
 * NOTE: changed to be processed in a thread so that processing here doesn't hinder
 * the server's ability to reply to the status
 *
 * @see is_stat_get()
 */

void *sync_node_jobs(

  void *vp)

  {
  struct pbsnode       *np;
  sync_job_info        *sji = (sync_job_info *)vp;
  char                 *raw_input;
  char                 *node_id;
  char                 *jobstring_in;
  char                 *joblist;
  char                 *jobidstr;
  long                  job_sync_timeout = JOB_SYNC_TIMEOUT;
  char                 *jobs_in_mom = NULL;

  if (vp == NULL)
    return(NULL);

  raw_input = sji->input;

  /* raw_input's format is:
   *   node name:<JOBID>[ <JOBID>]... */
  if ((jobstring_in = strchr(raw_input, ':')) != NULL)
    {
    node_id = raw_input;
    *jobstring_in = '\0';
    jobstring_in++;
    }
  else
    {
    /* bad input */
    free(raw_input);
    free(sji);

    return(NULL);
    }
    
  if ((np = find_nodebyname(node_id)) == NULL)
    {
    free(raw_input);
    free(sji);

    return(NULL);
    }

  /* FORMAT <JOBID>[ <JOBID>]... */
  jobs_in_mom = strdup(jobstring_in);
  joblist = jobstring_in;
  jobidstr = threadsafe_tokenizer(&joblist, (char *)" ");

  get_svr_attr_l(SRV_ATR_job_sync_timeout, &job_sync_timeout);

  while ((jobidstr != NULL) && 
         (isdigit(*jobidstr)) != FALSE)
    {
    if (job_should_be_killed(jobidstr, np))
      {
      if (kill_job_on_mom(jobidstr, np) == PBSE_NONE)
        {
        pthread_mutex_lock(&jobsKilledMutex);
        std::string str(jobidstr);
        jobsKilled.push_back(str);
        pthread_mutex_unlock(&jobsKilledMutex);
        set_task(WORK_Timed, 
                 time(NULL) + job_sync_timeout,
                 remove_job_from_already_killed_list,
                 (void *)new std::string(jobidstr),
                 FALSE);
        }
      }
    
    jobidstr = threadsafe_tokenizer(&joblist, " ");
    } /* END while ((jobidstr != NULL) && ...) */

  /* SUCCESS */
  free(raw_input);

  free(sji);

  if (jobs_in_mom)
    sync_node_jobs_with_moms(np, jobs_in_mom);

  unlock_node(np, __func__, NULL, LOGLEVEL);

  if (jobs_in_mom)
    free(jobs_in_mom);

  return(NULL);
  }  /* END sync_node_jobs() */



/*
 *      setup_notification -  Sets up the  mechanism for notifying
 *                            other members of the server's node
 *                            pool that a new node was added manually
 *                            via qmgr.  Actual notification occurs some
 *                            time later through the send_cluster_addrs mechanism
 */

void setup_notification(
    
  char *pname) /* I */

  {
  struct pbsnode *pnode;
  new_node       *nnew;

  if (pname != NULL)
    {
    if ((pnode = find_nodebyname(pname)) != NULL)
      {
      /* call it offline until after all nodes get the new ipaddr */
      pnode->nd_state |= INUSE_OFFLINE;
      
      nnew = (new_node *)calloc(1, sizeof(new_node));
      
      if (nnew == NULL)
        {
        unlock_node(pnode, __func__, "nnew == NULL", LOGLEVEL);
        return;
        }
      
      CLEAR_LINK(nnew->nn_link);
      
      nnew->nn_name = strdup(pname);
      
      append_link(&svr_newnodes, &nnew->nn_link, nnew);
      
      unlock_node(pnode, __func__, "nnew != NULL", LOGLEVEL);
      }
    }

  if (addrnote_mutex == NULL)
    {
    addrnote_mutex = (pthread_mutex_t *)calloc(1, sizeof(pthread_mutex_t));
    pthread_mutex_init(addrnote_mutex,NULL);
    }

  pthread_mutex_lock(addrnote_mutex);
  num_addrnote_tasks++;
  pthread_mutex_unlock(addrnote_mutex);

  return;
  } /* END setup_notification() */







/*
 **     reset gpu data in case mom reconnects with changed gpus.
 **     If we have real gpus, not virtual ones, then clear out gpu_status,
 **     gpus count and remove gpu subnodes.
 */

void clear_nvidia_gpus(

  struct pbsnode *np)  /* I */

  {
  pbs_attribute   temp;

  if ((np->nd_gpus_real) && (np->nd_ngpus > 0))
    {
    /* delete gpusubnodes by freeing it */
    free(np->nd_gpusn);
    np->nd_gpusn = NULL;

    /* reset # of gpus, etc */
    np->nd_ngpus = 0;
    np->nd_ngpus_free = 0;

    /* unset "gpu_status" node attribute */

    memset(&temp, 0, sizeof(temp));

    if (decode_arst(&temp, NULL, NULL, NULL, 0))
      {
      log_err(-1, __func__, "clear_nvidia_gpus:  cannot initialize attribute\n");

      return;
      }

    node_gpustatus_list(&temp, np, ATR_ACTION_ALTER);
    }

  return;
  }  /* END clear_nvidia_gpus() */





/* EOF on a stream received (either stream or addr must be specified) */
/* mark node down */
/* NOTE: pass in stream = -1 if you wish the stream to be optional */

void stream_eof(

  int       stream,  /* I (optional) */
  u_long    addr,  /* I (optional) */
  uint16_t  port,  /* I (optional) */
  int       ret)     /* I (ignored) */

  {
  char            log_buf[LOCAL_LOG_BUF_SIZE];
  enum conn_type  cntype = ToServerDIS;
  int             conn;
  int             my_err = 0;

  struct pbsnode *np = NULL;

  if (LOGLEVEL >= 6)
    {
    sprintf(log_buf, "stream: %d, addr: %ld, port %d", stream, addr, port);
    LOG_EVENT(PBSEVENT_JOB, PBS_EVENTCLASS_JOB, __func__, log_buf);
    }

  if (addr != 0)
    {
    np = AVL_find(addr, port, ipaddrs);
    }

  if (np == NULL)
    {
    /* cannot locate node */

    return;
    }

  /* Before we mark this node down see if we can connect */
  lock_node(np, __func__, "parent", LOGLEVEL);
  conn = svr_connect(addr, port, &my_err, np, NULL, cntype);
  if(conn >= 0)
    {
    unlock_node(np, __func__, "parent", LOGLEVEL);
    svr_disconnect(conn);
    return;
    }


  sprintf(log_buf,
    "connection to %s is no longer valid, connection may have been closed remotely, remote service may be down, or message may be corrupt (%s).  setting node state to down",
    np->nd_name,
    dis_emsg[ret]);

  log_err(-1, __func__, log_buf);

  /* mark node and all subnodes as down */

  if (np->num_node_boards > 0)
    {
    int             i;
    struct pbsnode *pnode;

    for (i = 0; i < np->num_node_boards; i++)
      {
      pnode = AVL_find(i,np->nd_mom_port,np->node_boards);

      lock_node(pnode, __func__, "subs", LOGLEVEL);
      update_node_state(pnode,INUSE_DOWN);
      unlock_node(pnode, __func__, "subs", LOGLEVEL);
      }
    }
  else
    update_node_state(np, INUSE_DOWN);

  unlock_node(np, __func__, "parent", LOGLEVEL);

  return;
  }  /* END stream_eof() */


/*
 * wrapper task that check_nodes places in the thread pool's queue
 */
void *check_nodes_work(

  void *vp)

  {
  work_task        *ptask = (struct work_task *)vp;

  struct pbsnode   *np = NULL;
  long              chk_len = 300;
  char              log_buf[LOCAL_LOG_BUF_SIZE];
  time_t            time_now = time(NULL);

  node_iterator     iter;
  
  /* load min refresh interval */
  get_svr_attr_l(SRV_ATR_check_rate, &chk_len);

  if (LOGLEVEL >= 5)
    {
    sprintf(log_buf, "verifying nodes are active (min_refresh = %d seconds)", (int)chk_len);

    log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, __func__, log_buf);
    }

  /* evaluate all nodes */
  reinitialize_node_iterator(&iter);

  while ((np = next_node(&allnodes,np,&iter)) != NULL)
    {
    if (!(np->nd_state & INUSE_DOWN))
      {
      if (np->nd_lastupdate < (time_now - chk_len)) 
        {
        if (LOGLEVEL >= 6)
          {
          sprintf(log_buf, "node %s not detected in %ld seconds, contacting mom",
          np->nd_name,
          (long int)(time_now - np->nd_lastupdate));
          
          log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, __func__, log_buf);
          }
          
        if (LOGLEVEL >= 0)
          {
          sprintf(log_buf, "node %s not detected in %ld seconds, marking node down",
            np->nd_name,
            (long int)(time_now - np->nd_lastupdate));
          
          log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, __func__, log_buf);
          }
        
        update_node_state(np, (INUSE_DOWN));    
          
        /* The node is up. Do not mark the node down, but schedule a check_nodes */
        }
      }
    } /* END for each node */

  if (ptask->wt_parm1 == NULL)
    {
    set_task(WORK_Timed, time_now + chk_len, check_nodes, (char *)NULL,FALSE);
    }

  /* since this is done via threading, we now free the task here */
  free(ptask->wt_mutex);
  free(ptask);

  return(NULL);
  } /* check_nodes_work() */




/*
 * Mark any nodes that haven't checked in as down.
 * If the node isn't down then it checks to see that the
 * last update hasn't been too long ago.
 */

void check_nodes(

  struct work_task *ptask)  /* I (modified) */

  {
  int rc = enqueue_threadpool_request(check_nodes_work,ptask);

  if (rc)
    {
    log_err(rc, __func__, "Unable to enqueue check nodes task into the threadpool");
    }
  }  /* END check_nodes() */







void *write_node_state_work(

  void *vp)

  {
  struct pbsnode *np;
  static char    *fmt = (char *)"%s %d\n";
  static FILE    *nstatef = NULL;
  int             iter = -1;

  int             savemask;

  pthread_mutex_lock(node_state_mutex);

  if (LOGLEVEL >= 5)
    {
    DBPRT(("write_node_state_work: entered\n"))
    }

  /* don't store volatile states like down and unknown */

  savemask = INUSE_OFFLINE | INUSE_RESERVE;

  if (nstatef != NULL)
    {
    fseek(nstatef, 0L, SEEK_SET); /* rewind and clear */

    if (ftruncate(fileno(nstatef), (off_t)0) != 0)
      {
      log_err(errno, __func__, "could not truncate file");

      pthread_mutex_unlock(node_state_mutex);
      
      return(NULL);
      }
    }
  else
    {
    /* need to open for first time, temporary-move to pbsd_init */

    if ((nstatef = fopen(path_nodestate, "w+")) == NULL)
      {
      log_err(errno, __func__, "could not open file");

      pthread_mutex_unlock(node_state_mutex);
      
      return(NULL);
      }
    }

  /*
  ** The only state that carries forward is if the
  ** node has been marked offline.
  */

  while ((np = next_host(&allnodes,&iter,NULL)) != NULL)
    {
    if (np->nd_state & INUSE_OFFLINE)
      {
      fprintf(nstatef, fmt, np->nd_name, np->nd_state & savemask);
      }

    unlock_node(np, __func__, NULL, LOGLEVEL);
    } /* END for each node */

  if (fflush(nstatef) != 0)
    {
    log_err(errno, __func__, "failed saving node state to disk");
    }

  fclose(nstatef);
  nstatef = NULL;

  pthread_mutex_unlock(node_state_mutex);

  return(NULL);
  } /* END write_node_state_work() */





void write_node_state(void)

  {
  int rc = enqueue_threadpool_request(write_node_state_work,NULL);

  if (rc)
    {
    log_err(rc, __func__, "Unable to enqueue write_node_state_work task into the threadpool");
    }
  }  /* END write_node_state() */



/* Create a new node_note file then overwrite the previous one.
 *
 *   The note file could get up to:
 *      (# of nodes) * (2 + MAX_NODE_NAME + MAX_NOTE)  bytes in size
 */
int write_node_note(void)

  {
  struct pbsnode *np;
  int             iter = -1;
  FILE           *nin;

  if (LOGLEVEL >= 2)
    {
    DBPRT(("%s: entered\n", __func__))
    }

  if ((nin = fopen(path_nodenote_new, "w")) == NULL)
    goto err1;

  if (svr_totnodes == 0)
    {
    log_event(
      PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, __func__, "Server has empty nodes list");

    fclose(nin);

    return(-1);
    }

  /* for each node ... */
  while ((np = next_host(&allnodes, &iter, NULL)) != NULL)
    {
    /* write node name followed by its note string */
    if ((np->nd_note != NULL) && 
        (np->nd_note[0] != '\0'))
      {
      fprintf(nin, "%s %s\n", np->nd_name, np->nd_note);
      }
    
    unlock_node(np, __func__, NULL, LOGLEVEL);
    }

  fflush(nin);

  if (ferror(nin))
    {
    fclose(nin);
    goto err1;
    }

  fclose(nin);

  if (rename(path_nodenote_new, path_nodenote) != 0)
    {
    log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, __func__,
      (char *)"replacing old node note file failed");

    return(-1);
    }

  return(PBSE_NONE);

err1:
  log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, __func__,
    (char *)"Node note file update failed");

  return(-1);
  }  /* END write_node_note() */



/*
 * free_prop - free list of prop structures created by proplist()
 */

static void free_prop(

  struct prop *prop)

  {
  struct prop *pp;

  for (pp = prop;pp != NULL;pp = prop)
    {
    prop = pp->next;

    free(pp->name);
    free(pp);
    }  /* END for (pp) */

  return;
  }    /* END free_prop() */




void *node_unreserve_work(

  void *vp)

  {
  resource_t       handle = *((resource_t *)vp);

  struct  pbsnode *np;
  int              iter = -1;

  /* clear old reserve */
  while ((np = next_host(&allnodes,&iter,NULL)) != NULL)
    {
    if (handle == RESOURCE_T_ALL)
      np->nd_np_to_be_used = 0;

    unlock_node(np, "node_unreserve_work", NULL, LOGLEVEL);
    }

  return(NULL);
  } /* END node_unreserve_work() */





/*
 * unreserve - unreserve nodes
 *
 * If handle is set to a existing resource_t, then release all nodes
 * associated with that handle, otherwise, (this is dangerous)
 * if handle == RESOURCE_T_ALL, release all nodes period.
 */

void node_unreserve(

  resource_t handle)

  {
  int rc = enqueue_threadpool_request(node_unreserve_work,NULL);

  if (rc)
    {
    log_err(rc, __func__, "Unable to enqueue node_unreserve task into the threadpool");
    }
  }  /* END node_unreserve() */




/*
** Look through the property list and make sure that all
** those marked are contained in the node.
*/

int hasprop(

  struct pbsnode *pnode,
  struct prop    *props)

  {
  struct  prop    *need;

  for (need = props;need;need = need->next)
    {

    struct prop *pp;

    if (need->mark == 0) /* not marked, skip */
      continue;

    for (pp = pnode->nd_first;pp != NULL;pp = pp->next)
      {
      if (strcmp(pp->name, need->name) == 0)
        break;  /* found it */
      }

    if (pp == NULL)
      {
      return(0);
      }
    }

  return(1);
  }  /* END hasprop() */





/*
 * see if node has the number of processors required
 * if free == SKIP_NONE,  check against total number of processors, else
 * if free != SKIP_NONE,  check against number free
 *
 * Return 1 if possible, 0 if not
 */

static int hasppn(

  struct pbsnode *pnode,     /* I */
  int             node_req,  /* I */
  int             free)      /* I */

  {
  if ((free != SKIP_NONE) &&
      (free != SKIP_NONE_REUSE) &&
      (pnode->nd_slots.get_number_free() >= node_req))
    {
    return(1);
    }

  if ((free == SKIP_NONE) && 
      (pnode->nd_slots.get_total_execution_slots() >= node_req))
    {
    return(1);
    }

  return(0);
  }  /* END hasppn() */




/*
** Count how many gpus are available for use on this node
*/
static int gpu_count(

  struct pbsnode *pnode,  /* I */
  int    freeonly)        /* I */

  {
  int  count = 0;
  char log_buf[LOCAL_LOG_BUF_SIZE];

  if ((pnode->nd_state & INUSE_OFFLINE) ||
      (pnode->nd_state & INUSE_UNKNOWN) ||
      (pnode->nd_state & INUSE_DOWN))
    {
    if (LOGLEVEL >= 7)
      {
      sprintf(log_buf,
        "Counted %d gpus %s on node %s that was skipped",
        count,
        (freeonly? "free":"available"),
        pnode->nd_name);
    
      log_ext(-1, __func__, log_buf, LOG_DEBUG);
      }
    return (count);
    }

  if (pnode->nd_gpus_real)
    {
    int j;

    for (j = 0; j < pnode->nd_ngpus; j++)
      {
      struct gpusubn *gn = pnode->nd_gpusn + j;

      /* always ignore unavailable gpus */
      if (gn->state == gpu_unavailable)
        continue;

      if (!freeonly)
        {
        count++;
        }
      else if ((gn->state == gpu_unallocated) ||
               ((gn->state == gpu_shared) &&
                (gpu_mode_rqstd == gpu_normal)))
        {
        count++;;
        }
      }
    }
  else
    {
    /* virtual gpus */
    if (freeonly)
      {
      count = pnode->nd_ngpus_free;
      }
    else
      {
      count = pnode->nd_ngpus;
      }
    }

  if (LOGLEVEL >= 7)
    {
    sprintf(log_buf,
      "Counted %d gpus %s on node %s",
      count,
      (freeonly? "free":"available"),
      pnode->nd_name);

    log_ext(-1, __func__, log_buf, LOG_DEBUG);
    }

  return(count);
  }  /* END gpu_count() */





/*
** get gpu index for this gpuid
*/
int gpu_entry_by_id(

  struct pbsnode *pnode,  /* I */
  char   *gpuid,
  int    get_empty)

  {
  if (pnode->nd_gpus_real)
    {
    int j;

    for (j = 0; j < pnode->nd_ngpus; j++)
      {
      struct gpusubn *gn = pnode->nd_gpusn + j;

      if ((gn->gpuid != NULL) && (strcmp(gpuid, gn->gpuid) == 0))
        {
        return(j);
        }
      }
    }

  /*
   * we did not find the entry.  if get_empty is set then look for an empty
   * slot.  If none is found then try to add a new entry to nd_gpusn
   */

  if (get_empty)
    {
    int j;

    for (j = 0; j < pnode->nd_ngpus; j++)
      {
      struct gpusubn *gn = pnode->nd_gpusn + j;

      if (gn->gpuid == NULL)
        {
        return(j);
        }
      }

    create_a_gpusubnode(pnode);
    return (pnode->nd_ngpus - 1);    
    }

  return (-1);
  }  /* END gpu_entry_by_id() */




/*
 * checks if a node is ok for to reshuffle
 *
 * All parameters are exactly the same as search
 * @param pnode - the node we're looking at
 *
 * @return TRUE if the node is reshuffleable for search's purposes
 */
int can_reshuffle(

  struct pbsnode *pnode,
  struct prop    *glorf,
  int             skip,
  int             vpreq,
  int             gpureq,
  int             pass)

  {
  char log_buf[LOCAL_LOG_BUF_SIZE];

  if (pnode->nd_ntype == NTYPE_CLUSTER)
    {
    if (pnode->nd_flag != thinking)
      {
      /* only shuffle nodes which have been selected above */

      return(FALSE);
      }

    if (pnode->nd_state & pass)
      return(FALSE);

    if (LOGLEVEL >= 6)
      {
      sprintf(log_buf,
        "search(2): starting eval gpus on node %s need %d(%d) mode %d has %d free %d skip %d",
        pnode->nd_name,
        gpureq,
        pnode->nd_ngpus_needed,
        gpu_mode_rqstd,
        pnode->nd_ngpus,
        gpu_count(pnode, TRUE),
        skip);

       log_ext(-1, __func__, log_buf, LOG_DEBUG);
       }

    if ((skip == SKIP_EXCLUSIVE) && 
        (vpreq < pnode->nd_slots.get_number_free()) &&
        (gpureq < gpu_count(pnode, TRUE)))
      return(FALSE);

    if ((skip == SKIP_ANYINUSE) &&
        (vpreq < pnode->nd_slots.get_number_free()) &&
        (gpureq < gpu_count(pnode, TRUE)))
      return(FALSE);

    if (!hasprop(pnode, glorf))
      return(FALSE);
    }
  else
    return(FALSE);

  return(TRUE);
  } /* can_reshuffle() */





/*
** Parse a number in a spec.
** Return 0 if okay, 1 if no number exists, -1 on error
*/

static int number(

  char **ptr,
  int   *num)

  {
  char  holder[80];
  int   i = 0;
  char *str = *ptr;
  char  log_buf[LOCAL_LOG_BUF_SIZE];

  while (isdigit(*str) && (unsigned int)(i + 1) < sizeof holder)
    holder[i++] = *str++;

  if (i == 0)
    {
    return(1);
    }

  holder[i] = '\0';

  if ((i = atoi(holder)) <= 0)
    {
    sprintf(log_buf, "zero illegal");

    return(-1);
    }

  *ptr = str;

  *num = i;

  return(0);
  }  /* END number() */




/*
** Check string to see if it is a legal property name.
** If not, return 1.
** *prop set to static char array containing the properity,
** must be copied.
*/

static int property(

  char **ptr,
  char **prop)

  {
  char        *str = *ptr;
  char        *dest = *prop;
  int          i = 0;
  char         log_buf[LOCAL_LOG_BUF_SIZE];
  long         cray_enabled = FALSE;

  get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled);

  if (!isalpha(*str))
    {
    if ((cray_enabled == FALSE) ||
        (is_compute_node(str) == FALSE))
      {
      sprintf(log_buf, "first character of property (%s) not a letter", str);
      
      return(1);
      }
    }

  while (isalnum(*str) || *str == '-' || *str == '.' || *str == '=' || *str == '_')
    dest[i++] = *str++;

  dest[i] = '\0';

  /* skip over "/vp_number" */

  if (*str == '/')
    {
    do
      {
      str++;
      }
    while (isdigit(*str));
    }

  *ptr = str;

  return(0);
  }  /* END property() */





/*
** Create a property list from a string.
** Return 0 if all is well, 1 otherwise.
*/

int proplist(

  char        **str,
  struct prop **plist,
  int          *node_req,
  int          *gpu_req,
  int          *mic_req)

  {
  struct prop *pp;
  char         name_storage[80];
  char        *pname;
  char        *pequal;
  int          have_gpus = FALSE;
  char         log_buf[LOCAL_LOG_BUF_SIZE];

  *node_req = 1; /* default to 1 processor per node */

  pname  = name_storage;
  *pname = '\0';

  for (;;)
    {
    if (property(str, &pname))
      {
      return(1);
      }

    if (*pname == '\0')
      break;

    if ((pequal = strchr(pname, (int)'=')) != NULL)
      {
      /* special property */

      /* identify the special property and place its value */
      /* into node_req       */

      *pequal = '\0';

      if (strcmp(pname, "ppn") == 0)
        {
        pequal++;

        if ((number(&pequal, node_req) != 0) || (*pequal != '\0'))
          {
          return(1);
          }
        }
      else if (strcmp(pname, "mics") == 0)
        {
        pequal++;

        if ((number(&pequal, mic_req) != PBSE_NONE) ||
            (*pequal != '\0'))
          {
          return(1);
          }
        }
      else if (strcmp(pname, "gpus") == 0)
        {
        pequal++;

        if ((number(&pequal, gpu_req) != 0) || (*pequal != '\0'))
          {
          return(1);
          }

        have_gpus = TRUE;
        gpu_err_reset = FALSE; /* default to no */

        /* default value if no other gets specified */

        gpu_mode_rqstd = gpu_exclusive_thread;
        }
      else
        {
        return(1); /* not recognized - error */
        }
      }
    else if (have_gpus && (!strcasecmp(pname, "exclusive_thread")))
      {
      gpu_mode_rqstd = gpu_exclusive_thread;
      }
    else if (have_gpus && (!strcasecmp(pname, "exclusive")))
      {
      gpu_mode_rqstd = gpu_exclusive_thread;
      }
    else if (have_gpus && (!strcasecmp(pname, "exclusive_process")))
      {
      gpu_mode_rqstd = gpu_exclusive_process;
      }
    else if (have_gpus && (!strcasecmp(pname, "default")))
      {
      gpu_mode_rqstd = gpu_normal;
      }
    else if (have_gpus && (!strcasecmp(pname, "shared")))
      {
      gpu_mode_rqstd = gpu_normal;
      }
    else if (have_gpus && (!strcasecmp(pname, "reseterr")))
      {
      gpu_err_reset = TRUE;
      }
    else
      {
      pp = (struct prop *)calloc(1, sizeof(struct prop));

      pp->mark = 1;
      pp->name = strdup(pname);
      pp->next = *plist;

      *plist = pp;
      }

    if ((have_gpus) && (LOGLEVEL >= 7))
      {
      sprintf(log_buf,
        "proplist: set needed gpu mode to %d",
        gpu_mode_rqstd);

       log_ext(-1, __func__, log_buf, LOG_DEBUG);
      }

    if (**str != ':')
      break;

    (*str)++;
    }  /* END for(;;) */

  return(PBSE_NONE);
  }  /* END proplist() */




/*
** Add the "global" spec to every sub-spec in "spec".
**      RETURNS:  allocated string buffer (must be freed externally)
*/

static char *mod_spec(

  char *spec,    /* I */
  char *global)  /* I */

  {
  char  *line;
  char  *cp;
  int    len;
  int    nsubspec;

  nsubspec = 1;

  for (cp = spec;*cp != '\0';cp++)
    {
    if (*cp == '+')
      {
      nsubspec++;
      }
    }

  len = strlen(global);

  line = (char *)calloc(1, nsubspec * (len + 1) + strlen(spec) + 1);

  if (line == NULL)
    {
    /* FAILURE */

    return(NULL);
    }

  cp = line;

  while (*spec)
    {
    if (*spec == '+')
      {
      *cp++ = ':';

      strcpy(cp, global);

      cp += len;
      }

    *cp++ = *spec++;
    }

  *cp++ = ':';

  strcpy(cp, global);

  return(line);
  }  /* END mod_spec() */



/*
 * Test a procs specification.
 *
 * Return >0 - number of procs counted in the spec if it works,
 *         0 - if it cannot be satisfied now,
 *        -1 - if it can never be satisfied.
 *
 */
int procs_available(
    
  int proc_ct)

  {
  int             iter = -1;
  int             procs_avail = 0;
  struct pbsnode *pnode;

  if (proc_ct > svr_clnodes)
    {
    /* user requested more processors than are available on the system*/
    return(-1);
    }

  while ((pnode = next_host(&allnodes,&iter,NULL)) != NULL)
    {
    procs_avail += pnode->nd_slots.get_number_free();

    unlock_node(pnode, "procs_available", NULL, LOGLEVEL);
    }

  if (proc_ct > procs_avail)
    {
    return(0);
    }

  return(procs_avail);
  } /* END procs_available() */




int node_is_spec_acceptable(

  struct pbsnode   *pnode,
  single_spec_data *spec,
  char             *ProcBMStr,
  int              *eligible_nodes)

  {
  struct prop    *prop = spec->prop;

  int             ppn_req = spec->ppn;
  int             gpu_req = spec->gpu;
  int             mic_req = spec->mic;
  int             gpu_free;
  int             np_free;
  int             mic_free;

#ifdef GEOMETRY_REQUESTS
  if (IS_VALID_STR(ProcBMStr))
    {
    if (pnode->nd_state != INUSE_FREE)
      return(FALSE);

    if (node_satisfies_request(pnode, ProcBMStr) == FALSE)
      return(FALSE);
    }
#endif

  /* NYI: check if these are necessary */
  pnode->nd_flag = okay;

  if (pnode->nd_ntype != NTYPE_CLUSTER)
    return(FALSE);

  /* make sure that the node has properties */
  if (hasprop(pnode, prop) == FALSE)
    return(FALSE);

  if ((hasppn(pnode, ppn_req, SKIP_NONE) == FALSE) ||
      (gpu_count(pnode, FALSE) < gpu_req) ||
      (pnode->nd_nmics < mic_req))
    return(FALSE);

  (*eligible_nodes)++;

  if ((pnode->nd_state & (INUSE_OFFLINE | INUSE_DOWN | INUSE_RESERVE | INUSE_JOB)) != 0)
    return(FALSE);

  gpu_free = gpu_count(pnode, TRUE) - pnode->nd_ngpus_to_be_used;
  np_free  = pnode->nd_slots.get_number_free() - pnode->nd_np_to_be_used;
  mic_free = pnode->nd_nmics_free - pnode->nd_nmics_to_be_used;
  
  if ((ppn_req > np_free) ||
      (gpu_req > gpu_free) ||
      (mic_req > mic_free))
    return(FALSE);

  return(TRUE);
  } /* END node_is_spec_acceptable() */




int parse_req_data(
    
  complete_spec_data *all_reqs)

  {
  int               i;
  int               j = 0;
  long              cray_enabled = FALSE;

  single_spec_data *req;

  get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled);
  all_reqs->total_nodes = 0;

  for (i = 0; i < all_reqs->num_reqs; i++)
    {
    req = all_reqs->reqs + i;
    req->nodes = 1;
    req->gpu   = 0;
    req->ppn   = 1;
    req->prop  = NULL;

    if ((cray_enabled == FALSE) ||
        (is_compute_node(all_reqs->req_start[i]) == FALSE))
      {
      if ((j = number(&(all_reqs->req_start[i]), &(req->nodes))) == -1)
        return(j);
      }

    if (j == 0)
      {
      /* there was a number */
      if (*(all_reqs->req_start[i]) != '\0')
        {
        if (*(all_reqs->req_start[i]) == ':')
          all_reqs->req_start[i]++;
        
        if (proplist(&(all_reqs->req_start[i]), &(req->prop), &(req->ppn), &(req->gpu), &(req->mic)))
          return(-1);
        }
      }
    else
      {
      if (*(all_reqs->req_start[i]) != '\0')
        {
        if (proplist(&(all_reqs->req_start[i]), &(req->prop), &(req->ppn), &(req->gpu), &(req->mic)))
          return(-1);
        }
      }

    all_reqs->total_nodes += req->nodes;
    }

  return(PBSE_NONE);
  } /* END parse_req_data() */




/* 
 * builds the node_job_add_info struct that will be used by set_nodes
 * instead of looping over different nodes.
 */

int save_node_for_adding(
    
  node_job_add_info *naji,
  struct pbsnode    *pnode,
  single_spec_data  *req,
  char              *first_node_name,
  int                is_external_node,
  int                req_rank)

  {
  node_job_add_info *to_add;
  node_job_add_info *old_next;
  node_job_add_info *cur_naji;
  bool               first = false;

  if ((first_node_name[0] != '\0') &&
      (!strcmp(first_node_name, pnode->nd_name)))
    {
    pnode->nd_order = 0;
    first = true;
    req_rank = 0;
    }
  else
    pnode->nd_order = 1;

  if (naji->node_name[0] == '\0')
    {
    /* first */
    snprintf(naji->node_name, sizeof(naji->node_name), "%s", pnode->nd_name);
    naji->ppn_needed = req->ppn;
    naji->gpu_needed = req->gpu;
    naji->mic_needed = req->mic;
    naji->is_external = is_external_node;
    naji->req_rank = req_rank;
    }
  else
    {
    /* second */
    if ((to_add = (node_job_add_info *)calloc(1, sizeof(node_job_add_info))) == NULL)
      {
      log_err(ENOMEM, __func__, "Cannot allocate memory!");

      return(ENOMEM);
      }
    
    if (first == true)
      {
      /* move the first element here and place me first */
      memcpy(to_add, naji, sizeof(node_job_add_info));

      snprintf(naji->node_name, sizeof(naji->node_name), "%s", pnode->nd_name);
      naji->ppn_needed = req->ppn;
      naji->gpu_needed = req->gpu;
      naji->mic_needed = req->mic;
      naji->is_external = is_external_node;
      naji->req_rank = req_rank;
      }
    else
      {
      /* initialize to_add */
      snprintf(to_add->node_name, sizeof(to_add->node_name), "%s", pnode->nd_name);
      to_add->ppn_needed = req->ppn;
      to_add->gpu_needed = req->gpu;
      to_add->mic_needed = req->mic;
      to_add->is_external = is_external_node;
      to_add->req_rank = req_rank;
      }

    /* fix pointers, NOTE: works even if old_next == NULL */
    cur_naji = naji;
    old_next = cur_naji->next;
    while(old_next != NULL)
      {
      if(to_add->req_rank <= old_next->req_rank)
        {
        cur_naji->next = to_add;
        to_add->next = old_next;
        to_add = NULL;
        break;
        }
      cur_naji = old_next;
      old_next = cur_naji->next;
      }
    if(to_add != NULL)
      {
      cur_naji->next = to_add;
      to_add->next = NULL;
      }
    }

  /* count off the number we have reserved */
  pnode->nd_np_to_be_used    += req->ppn;
  pnode->nd_ngpus_to_be_used += req->gpu;

  return(PBSE_NONE);
  } /* END save_node_for_adding */




/*
 * if there is a node being requested, the spec should look like
 * node_name[:ppn=X][+]...
 * otherwise it should look like:
 * <NUM_NODES>[:ppn=X][+]...
 *
 * If a specific node is being requested first, copy just the
 * name into first_node_name  
 */

void set_first_node_name(
    
  char *spec_param,      /* I */
  char *first_node_name) /* O */

  {
  int   i;
  int   len;

  if (isdigit(spec_param[0]) == TRUE)
    {
    first_node_name[0] = '\0';
    }
  else
    {
    len = strlen(spec_param);
    
    for (i = 0; i < len; i++)
      {
      /* a ':' means you've moved on to ppn and a + means its the next req */
      if ((spec_param[i] == ':') ||
          (spec_param[i] == '+') ||
          (spec_param[i] == '|'))
        break;
      else
        first_node_name[i] = spec_param[i];
      }
    
    /* make sure you NULL terminate */
    first_node_name[i] = '\0';
    }

  } /* END set_first_node_name() */




int is_reserved_property(

  char *prop)

  {
  if ((strncmp(prop, "ppn", strlen("ppn")) == 0) ||
      (strncmp(prop, "gpus", strlen("gpus") == 0)) ||
      (strncasecmp(prop, "exclusive_thread", strlen("exclusive_thread")) == 0) ||
      (strncasecmp(prop, "exclusive", strlen("exclusive")) == 0) ||
      (strncasecmp(prop, "exclusive_process", strlen("exclusive_process")) == 0) ||
      (strncasecmp(prop, "default", strlen("default")) == 0) ||
      (strncasecmp(prop, "shared", strlen("shared")) == 0) ||
      (strncasecmp(prop, "reseterr", strlen("reseterr")) == 0))
    return(TRUE);
  else
    return(FALSE);
  } /* END is_reserved_property() */




int is_compute_node(

  char *node_id)

  {
  struct pbsnode *pnode;
  int             rc = FALSE;
  char           *colon;
  char           *plus;

  if ((colon = strchr(node_id, ':')) != NULL)
    {
    if ((!strcmp(colon + 1, "external")) ||
        (!strcmp(colon + 1, alps_reporter_feature)) || 
        (!strcmp(colon + 1, alps_starter_feature)))
      {
      return(rc);
      }
    else
      *colon = '\0';
    }
  
  if ((plus = strchr(node_id, '+')) != NULL)
    *plus = '\0';

  if ((pnode = find_nodebyname(node_id)) != NULL)
    {
    rc = TRUE;
    unlock_node(pnode, __func__, NULL, LOGLEVEL);
    }

  if (colon != NULL)
    *colon = ':';

  if (plus != NULL)
    *plus = '+';

  return(rc);
  } /* END is_compute_node() */




void release_node_allocation(
    
  node_job_add_info *naji)

  {
  node_job_add_info *current = NULL;
  struct pbsnode    *pnode = NULL;

  current = naji;
  while (current != NULL) 
    {
    if ((pnode = find_nodebyname(current->node_name)) != NULL)
      {
      pnode->nd_np_to_be_used    -= current->ppn_needed;
      pnode->nd_ngpus_to_be_used -= current->gpu_needed;
      pnode->nd_nmics_to_be_used -= current->mic_needed;
      unlock_node(pnode, __func__, NULL, LOGLEVEL);
      }
    current = current->next;
    }
  } /* END release_node_allocation() */




int check_for_node_type(

  complete_spec_data *all_reqs,
  enum node_types     nt)

  {
  single_spec_data *req;
  int               i;
  int               found_type = FALSE;
  struct pbsnode   *pnode;
  struct pbsnode   *reporter = alps_reporter;
  struct prop      *p;

  if (reporter == NULL)
    {
    /* this shouldn't be possible */
    log_err(-1, __func__, "Checking for node types with a non-cray enabled pbs_server??");
    return(-1);
    }


  for (i = 0; i < all_reqs->num_reqs; i++)
    {
    req = all_reqs->reqs + i;

    for (p = req->prop; p != NULL; p = p->next)
      {
      if ((!strcmp(p->name, "cray_compute")) ||
          (!strcmp(p->name, alps_starter_feature)))
        continue;

      lock_node(reporter, __func__, NULL, LOGLEVEL);
      pnode = find_node_in_allnodes(&(reporter->alps_subnodes), p->name);
      unlock_node(reporter, __func__, NULL, LOGLEVEL);

      if (pnode != NULL)
        {
        unlock_node(pnode, __func__, NULL, LOGLEVEL);

        if (nt == ND_TYPE_CRAY)
          {
          found_type = TRUE;
  
          break;
          }
        }
      else if (nt != ND_TYPE_CRAY)
        {
        int login = FALSE;

        pnode = find_nodebyname(p->name);

        if (pnode != NULL)
          {
          if (pnode->nd_is_alps_login == TRUE)
            login = TRUE;

          unlock_node(pnode, __func__, NULL, LOGLEVEL);

          if (nt == ND_TYPE_EXTERNAL)
            {
            if (login == FALSE)
              found_type = TRUE;
            }
          else if (nt == ND_TYPE_LOGIN)
            if (login == TRUE)
              found_type = TRUE;

          break;
          }
        }
      }

    if (found_type == TRUE)
      break;
    }

  return(found_type);
  } /* END check_for_node_type() */




enum job_types find_job_type(

  complete_spec_data *all_reqs)

  {
  enum job_types jt = JOB_TYPE_cray;
  
  if (check_for_node_type(all_reqs, ND_TYPE_CRAY) == TRUE)
    {
    if (check_for_node_type(all_reqs, ND_TYPE_EXTERNAL) == TRUE)
      jt = JOB_TYPE_heterogeneous;
    else
      jt = JOB_TYPE_cray;
    }
  else if (check_for_node_type(all_reqs, ND_TYPE_EXTERNAL) == TRUE)
    {
    jt = JOB_TYPE_normal;
    }
  else if (check_for_node_type(all_reqs, ND_TYPE_LOGIN) == TRUE)
    jt = JOB_TYPE_login;

  return(jt);
  } /* END find_job_type() */




int add_login_node_if_needed(

  char              **first_node_name_ptr,
  char               *login_prop,
  node_job_add_info  *naji)

  {
  char             *first_node_name = *first_node_name_ptr;
  struct pbsnode   *login = find_nodebyname(first_node_name);
  int               need_to_add_login = FALSE;
  int               rc = PBSE_NONE;
  int               dummy1;
  int               dummy2;
  int               dummy3;
  struct prop      *prop = NULL;
  single_spec_data  req;

  if (login == NULL)
    need_to_add_login = TRUE;
  else
    {
    if (login->nd_is_alps_login == FALSE)
      need_to_add_login = TRUE;

    unlock_node(login, __func__, NULL, LOGLEVEL);
    }

  if (need_to_add_login == TRUE)
    {
    if (login_prop != NULL)
      {
      proplist(&login_prop, &prop, &dummy1, &dummy2, &dummy3);
      }

    if ((login = get_next_login_node(prop)) == NULL)
      rc = -1;
    else
      {
      if (naji != NULL)
        {
        /* add to naji */
        req.nodes = 1;
        req.ppn = 1;
        req.gpu = 0;
        req.mic = 0;
        req.prop = NULL;
        save_node_for_adding(naji, login, &req, login->nd_name, FALSE, -1);
        strcpy(*first_node_name_ptr, login->nd_name);
        }
      
      rc = PBSE_NONE;

      unlock_node(login, __func__, NULL, LOGLEVEL);
      }

    if (prop != NULL)
      free_prop(prop);
    }

  return(rc);
  } /* END add_login_node_if_needed() */




int node_is_external(

  struct pbsnode *pnode)

  {
  int is_external = FALSE;

  /* all logins have nd_is_alps_login set to true.
   * all cray computes have their parent pointer set to alps_reporter.
   * if neither of these are found, it must be an external node */
  if ((pnode->nd_is_alps_login == FALSE) &&
      (pnode->parent == NULL))
    is_external = TRUE;
  
  return(is_external);
  } /* END node_is_external() */




/*
 * Test a node specification.
 *
 * Return >0 - number of nodes counted in the spec if it works,
 *         0 - if it cannot be satisfied,
 *        -1 - if it can never be satisfied.
 * Okay to bail early if "early" is true.
 * VPs selected are marked "thinking"
 */

int node_spec(

  char               *spec_param, /* I */
  int                 early,      /* I (boolean) */
  int                 exactmatch, /* I (boolean) - NOT USED */
  char               *ProcBMStr,  /* I */
  char               *FailNode,   /* O (optional,minsize=1024) */
  node_job_add_info  *naji,       /* O (optional) */
  char               *EMsg,       /* O (optional,minsize=1024) */
  char               *login_prop, /* I (optional) */
  alps_req_data     **ard_array,  /* O (optional) */
  int                *num_reqs,   /* O (optional) */
  enum job_types     &job_type)

  {
  struct pbsnode      *pnode;
  char                 first_node_name[PBS_MAXHOSTNAME + 1];
  char                *first_name_ptr;
  node_iterator        iter;
  char                 log_buf[LOCAL_LOG_BUF_SIZE];

  char                *globs;
  char                *cp;
  char                *hold;
  int                  i;
  int                  num;
  int                  rc;
  int                  eligible_nodes = 0;
  complete_spec_data   all_reqs;
  char                *spec;
  char                *plus;
  long                 cray_enabled = FALSE;
  int                  num_alps_reqs = 0;

  if (EMsg != NULL)
    EMsg[0] = '\0';

  if (FailNode != NULL)
    FailNode[0] = '\0';

  if (LOGLEVEL >= 6)
    {
    sprintf(log_buf, "entered spec=%.4000s", spec_param);

    log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);

    DBPRT(("%s\n", log_buf));
    }
  
  job_type = JOB_TYPE_normal;

  set_first_node_name(spec_param, first_node_name);
  get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled);

  spec = strdup(spec_param);

  if (spec == NULL)
    {
    /* FAILURE */
    sprintf(log_buf, "cannot alloc memory");

    if (LOGLEVEL >= 1)
      {
      log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);
      }

    if (EMsg != NULL)
      {
      snprintf(EMsg, 1024, "%s", log_buf);
      }

    return(-1);
    }

  if ((globs = strchr(spec, '#')) != NULL)
    {
    *globs++ = '\0';

    globs = strdup(globs);

    while ((cp = strrchr(globs, '#')) != NULL)
      {
      *cp++ = '\0';

      hold = mod_spec(spec, cp);
      
      free(spec);
      
      spec = hold;
      }

    hold = mod_spec(spec, globs);
    
    free(spec);
    
    spec = hold;

    free(globs);
    }  /* END if ((globs = strchr(spec,'#')) != NULL) */

  all_reqs.num_reqs = 1;
  plus = spec;

  /* count number of reqs */
  while (*plus != '\0')
    {
    if ((*plus == '+') ||
        (*plus == '|'))
      all_reqs.num_reqs++;

    plus++;
    }

  /* allocate space in all_reqs */
  all_reqs.reqs      = (single_spec_data *)calloc(all_reqs.num_reqs, sizeof(single_spec_data));
  all_reqs.req_start = (char **)calloc(all_reqs.num_reqs, sizeof(char *));

  if ((all_reqs.reqs == NULL) ||
      (all_reqs.req_start == NULL))
    {
    if (all_reqs.reqs != NULL)
      free(all_reqs.reqs);
    else if (all_reqs.req_start != NULL)
      free(all_reqs.req_start);

    log_err(ENOMEM, __func__, "Cannot allocate memory!");
    free(spec);
    return(-1);
    }

  /* set up pointers for reqs */
  plus = spec;
  i = 0;
  all_reqs.req_start[i] = spec;
  i++;

  while (*plus != '\0')
    {
    /* make the '+' NULL and advance past it */
    if (*plus == '|')
      num_alps_reqs++;

    if ((*plus == '|') ||
        (*plus == '+'))
      {
      all_reqs.reqs[i].req_id = num_alps_reqs;
      
      *plus = '\0';
      plus++;
      
      /* advance past "nodes=" */
      if (!strncmp(plus, "nodes=", strlen("nodes=")))
        plus += strlen("nodes=");
      
      all_reqs.req_start[i] = plus;
      i++;
      }
    else
      plus++;
    }

  /* now parse each spec into the data */
  if ((rc = parse_req_data(&all_reqs)) != PBSE_NONE)
    {
    /* FAILURE */
    for (i = 0; i < all_reqs.num_reqs; i++)
      free_prop(all_reqs.reqs[i].prop);
    
    free(all_reqs.reqs);
    free(all_reqs.req_start);

    free(spec);

    return(rc);
    }

  num = all_reqs.total_nodes;

#ifndef CRAY_MOAB_PASSTHRU
  /* If we restart pbs_server while the cray is down, pbs_server won't know about
   * the computes. Don't perform this check for this case. */
  if ((cray_enabled != TRUE) || 
      (alps_reporter == NULL) ||
      (alps_reporter->alps_subnodes.ra->num != 0))
    {
    if (num > svr_clnodes)
      {
      /* FAILURE */

      free(spec);

      sprintf(log_buf, "job allocation request exceeds available cluster nodes, %d requested, %d available",
        num,
        svr_clnodes);

      if (LOGLEVEL >= 6)
        {
        log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);
        }

      if (EMsg != NULL)
        {
        snprintf(EMsg, 1024, "%s", log_buf);
        }

      return(-1);
      }
    }
#endif

  if (LOGLEVEL >= 6)
    {
    sprintf(log_buf, "job allocation debug: %d requested, %d svr_clnodes, %d svr_totnodes",
      num,
      svr_clnodes,
      svr_totnodes);

    log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);

    DBPRT(("%s\n", log_buf));
    }

  if (cray_enabled == TRUE)
    {
    job_type = find_job_type(&all_reqs);

    first_name_ptr = first_node_name;

    if ((job_type == JOB_TYPE_cray) ||
        (job_type == JOB_TYPE_heterogeneous))
      {
      /* naji == NULL indicates that this is a qsub not a run command,
       * we only need to assign the login when the job is run */
      if (naji != NULL)
        {
        if (add_login_node_if_needed(&first_name_ptr, login_prop, naji) != PBSE_NONE)
          {
          snprintf(log_buf, sizeof(log_buf), 
            "Couldn't find an acceptable login node for spec '%s' with feature request '%s'",
            spec_param,
            (login_prop != NULL) ? login_prop : "null");
          
          log_err(-1, __func__, log_buf);
          
          free(spec);
          
          return(-1);
          }
        }
      }

    if ((num_alps_reqs > 0) &&
        (ard_array != NULL) &&
        (job_type == JOB_TYPE_cray))
      {
      *ard_array = (alps_req_data *)calloc(num_alps_reqs + 1, sizeof(alps_req_data));
      
      for (i = 0; i <= num_alps_reqs; i++)
        (*ard_array)[i].node_list = get_dynamic_string(-1, NULL);

      *num_reqs = num_alps_reqs + 1;
      }
    }

  reinitialize_node_iterator(&iter);
  pnode = NULL;

  /* iterate over all nodes */
  while ((pnode = next_node(&allnodes,pnode,&iter)) != NULL)
    {
    /* check each req against this node to see if it satisfies it */
    for (i = 0; i < all_reqs.num_reqs; i++)
      {
      single_spec_data *req = all_reqs.reqs + i;

      if (req->nodes > 0)
        {
        if (node_is_spec_acceptable(pnode, req, ProcBMStr, &eligible_nodes) == TRUE)
          {
          if (naji != NULL)
            {
            /* for heterogeneous jobs on the cray, record the external 
             * nodes in a separate attribute */
            if ((job_type == JOB_TYPE_heterogeneous) &&
                (node_is_external(pnode) == TRUE))
              save_node_for_adding(naji, pnode, req, first_node_name, TRUE, i+1);
            else
              save_node_for_adding(naji, pnode, req, first_node_name, FALSE, i+1);

            if ((num_alps_reqs > 0) &&
                (ard_array != NULL) &&
                (*ard_array != NULL))
              {
              if ((*ard_array)[req->req_id].node_list->used != 0)
                append_char_to_dynamic_string((*ard_array)[req->req_id].node_list, ',');

              append_dynamic_string((*ard_array)[req->req_id].node_list, pnode->nd_name);

              if (req->ppn > (*ard_array)[req->req_id].ppn)
                (*ard_array)[req->req_id].ppn = req->ppn;
              }
            }

          /* decrement needed nodes */
          all_reqs.total_nodes--;
          req->nodes--;
    
          /* are all reqs satisfied? */
          if (all_reqs.total_nodes == 0)
            break;
          }
        }
      }

    /* are all reqs satisfied? */
    if (all_reqs.total_nodes == 0)
      {
      unlock_node(pnode, __func__, NULL, LOGLEVEL);
      break;
      }
    } /* END for each node */

  for (i = 0; i < all_reqs.num_reqs; i++)
    if (all_reqs.reqs[i].prop != NULL)
      free_prop(all_reqs.reqs[i].prop);
  
  free(all_reqs.reqs);
  free(all_reqs.req_start);

  free(spec);

  /* If we restart pbs_server while the cray is down, pbs_server won't know about
   * the computes. Don't perform this check for this case. */
  if ((cray_enabled != TRUE) || 
      (alps_reporter == NULL) ||
      (alps_reporter->alps_subnodes.ra->num != 0))
    {
#ifndef CRAY_MOAB_PASSTHRU
    if (eligible_nodes < num)
      {
      if ((SvrNodeCt == 0) || (SvrNodeCt < num))
        {
        /* sufficient eligible nodes do not exist */
        /* FAILURE */
        sprintf(log_buf,
          "job requesting nodes that will never be available - spec = %s",
          spec_param);

        log_err(-1, __func__, log_buf);
        if (naji != NULL)
          release_node_allocation(naji);

        return(-1);
        }
      }
#endif
    }

  if (all_reqs.total_nodes > 0)
    {
    /* nodes not currently available */
    /* FAILURE */
    sprintf(log_buf,
      "job allocation request exceeds currently available cluster nodes, %d requested, %d available",
      num,
      num - all_reqs.total_nodes);

    log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);

    if (EMsg != NULL)
      {
      snprintf(EMsg, MAXLINE, "%s", log_buf);
      }

    if (naji != NULL)
      release_node_allocation(naji);

    return(0);
    } /* END if (all_reqs.total_nodes > 0) */

  /* SUCCESS - spec is ok */
  if (LOGLEVEL >= 6)
    {
    sprintf(log_buf, "job allocation debug(3): returning %d requested", num);

    log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);

    DBPRT(("%s\n", log_buf));
    }

  return(num);
  }  /* END node_spec() */




#ifdef GEOMETRY_REQUESTS
/**
 * get_bitmap
 *
 * @param pjob (I) - the job whose bitmap is be retrieved
 * @param ProcBMPtr (O) - the ptr to the string where the bitmap will be stored
 * @param ProcBMSize (I) - the size of the string ProcBMPtr points to
 * @return FAILURE if there is no specified bitmap or either pjob or ProcBMStrPtr are NULL
 * @return SUCCESS otherwise
 */
int get_bitmap(

  job  *pjob,        /* I */
  int   ProcBMSize,  /* I */
  char *ProcBMPtr)   /* O */

  {
  resource     *presc;
  resource_def *prd;

  char          LocalBM[MAX_BM];

  if ((pjob == NULL) ||
      (ProcBMPtr == NULL))
    {
    return(FAILURE);
    }

  LocalBM[0] = '\0';

  /* read the bitmap from the resource list */
  prd = find_resc_def(svr_resc_def,"procs_bitmap",svr_resc_size);
  presc = find_resc_entry(&pjob->ji_wattr[JOB_ATR_resource],prd);
  
  if ((presc != NULL) && 
      (presc->rs_value.at_flags & ATR_VFLAG_SET))
    {
    snprintf(LocalBM,sizeof(LocalBM),"%s",presc->rs_value.at_val.at_str);
    }
  else
    {
    /* fail if there was no bitmap given */

    return(FAILURE);
    }

  if (LocalBM[0] == '\0')
    {
    /* fail if there was no bitmap given */

    return(FAILURE);
    }
  else
    {
    snprintf(ProcBMPtr,sizeof(LocalBM),"%s",LocalBM);
    return(SUCCESS);
    }
  } /* end get_bitmap() */




/**
 * node_satisfies_request
 *
 * @param pnode (I) - the node to check for validity
 * @param ProcBMStr (I) - the bitmap of procs requested
 * @return TRUE - if the node satisfies the bitmap, FALSE otherwise
 * @return BM_ERROR if the bitmap isn't valid
 *
 * NOTE: must always be called by a thread already locking the pnode's mutex
 */
int node_satisfies_request(

  struct pbsnode *pnode,     /* I */
  char           *ProcBMStr) /* I */

  {
  int BMLen;
  int BMIndex;

  if (IS_VALID_STR(ProcBMStr) == FALSE)
    return(BM_ERROR);

  /* nodes are exclusive when we're using bitmaps */
  if (pnode->nd_state != INUSE_FREE)
    return(FALSE);

  BMLen = strlen(ProcBMStr);

  /* process in reverse because ProcBMStr[0] referes to core index 0 */
  BMIndex = BMLen-1;

  /* check if the requested processors are available on this node */
  for (int i = 0; i < pnode->nd_slots.get_total_execution_slots() && BMIndex >= 0; i++)
    {
    /* don't check cores that aren't requested */
    if (ProcBMStr[BMIndex--] != '1')
      continue;

    if (pnode->nd_slots.is_occupied(i) == true)
      return(FALSE);
    }

  if (BMIndex >= 0)
    {
    /* this means we didn't finish checking the string -
     * the node doesn't have enough processors */

    return(FALSE);
    }

  /* passed all checks, we're good */
  return(TRUE);
  } /* END node_satisfies_request() */




/**
 * reserve_node
 *
 * @param pnode - node to reserve
 * @param pjob - the job to be added to the node
 * @param hlistptr - a pointer to the host list 
 */

job_reservation_info *reserve_node(

  struct pbsnode                      *pnode,     /* I/O */
  job                                 *pjob,      /* I */
  char                                *ProcBMStr) /* I */

  {
  if ((pnode == NULL) ||
      (pjob == NULL)  ||
      (ProcBMStr == NULL))
    {
    return(NULL);
    }

  int                   BMIndex = strlen(ProcBMStr) - 1;
  job_reservation_info *node_info = (job_reservation_info *)calloc(1, sizeof(job_reservation_info));

  /* now reserve each node */
  for (int i = 0; i < pnode->nd_slots.get_total_execution_slots() && BMIndex >= 0; i++)
    {
    /* ignore unrequested cores */
    if (ProcBMStr[BMIndex--] != '1')
      continue;

    pnode->nd_slots.reserve_execution_slot(i, node_info->est);
    }

  if (BMIndex >= 0)
    {
    /* failure */
    free(node_info);
    return(NULL);
    }
    
  job_usage_info *jui = new job_usage_info(pjob->ji_qs.ji_jobid);
    
  jui->est = node_info->est;
  snprintf(node_info->node_name, sizeof(node_info->node_name), "%s", pnode->nd_name);
  node_info->port = pnode->nd_mom_rm_port;
  pnode->nd_job_usages.push_back(jui);
    
  /* mark the node as exclusive */
  pnode->nd_state = INUSE_JOB;

  return(node_info);
  }
#endif /* GEOMETRY_REQUESTS */




/**
 * adds this job to the node's list of jobs
 * checks to be sure not to add duplicates
 *
 * conditionally updates the subnode's state
 * decrements the amount of needed nodes
 *
 * @param pnode - the node that the job is running on
 * @param nd_psn - the subnode (processor) that the job is running on
 * @param newstate - the state nodes are transitioning to when used
 * @param pjob - the job that is going to be run
 */
int add_job_to_node(

  struct pbsnode *pnode,     /* I/O */
  struct pbssubn *snp,       /* I/O */
  short           newstate,  /* I */
  job            *pjob)      /* I */

  {
  struct jobinfo *jp;
  char            log_buf[LOCAL_LOG_BUF_SIZE];

  /* NOTE:  search existing job array.  add job only if job not already in place */
  if (LOGLEVEL >= 5)
    {
    sprintf(log_buf, "allocated node %s/%d to job %s (nsnfree=%d)",
      pnode->nd_name,
      snp->index,
      pjob->ji_qs.ji_jobid,
      pnode->nd_slots.get_number_free());

    log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);
    DBPRT(("%s\n", log_buf));
    }

  for (jp = snp->jobs;jp != NULL;jp = jp->next)
    {
    if (!(strcmp(jp->jobid, pjob->ji_qs.ji_jobid)))
      break;
    }

  if (jp == NULL)
    {
    /* add job to front of subnode job array */
    jp = (struct jobinfo *)calloc(1, sizeof(struct jobinfo));

    jp->next = snp->jobs;
    snp->jobs = jp;
    strcpy(jp->jobid, pjob->ji_qs.ji_jobid);

    /* if no free VPs, set node state */
    if ((pnode->nd_slots.get_number_free() <= 0) ||
        (pjob->ji_wattr[JOB_ATR_node_exclusive].at_val.at_long == TRUE))
      pnode->nd_state = newstate;

    if (snp->inuse == INUSE_FREE)
      {
      snp->inuse = newstate;
      }
    }

  /* decrement the amount of nodes needed */
  --pnode->nd_np_to_be_used;

  return(SUCCESS);
  } /* END add_job_to_node() */


    

int add_job_to_gpu_subnode(
    
  struct pbsnode *pnode,
  struct gpusubn *gn,
  job            *pjob)

  {
  if (!pnode->nd_gpus_real)
    {
    /* update the gpu subnode */
    strcpy(gn->jobid, pjob->ji_qs.ji_jobid);
    gn->inuse = TRUE;

    /* update the main node */
    pnode->nd_ngpus_free--;
    }

  gn->job_count++;
  pnode->nd_ngpus_to_be_used--;

  return(PBSE_NONE);
  } /* END add_job_to_gpu_subnode() */




int add_job_to_mic(

  struct pbsnode *pnode,
  int             index,
  job            *pjob)

  {
  int rc = -1;

  if (pnode->nd_micjobs[index].jobid[0] == '\0')
    {
    strcpy(pnode->nd_micjobs[index].jobid, pjob->ji_qs.ji_jobid);
    pnode->nd_nmics_free--;
    pnode->nd_nmics_to_be_used--;
    rc = PBSE_NONE;
    }

  return(rc);
  } /* END add_job_to_mic() */




int remove_job_from_nodes_mics(

  struct pbsnode *pnode,
  job            *pjob)

  {
  short i;

  for (i = 0; i < pnode->nd_nmics; i++)
    {
    if (!strcmp(pnode->nd_micjobs[i].jobid, pjob->ji_qs.ji_jobid))
      pnode->nd_micjobs[i].jobid[0] = '\0';
    }

  return(PBSE_NONE);
  } /* END remove_job_from_nodes_mics() */




/**
 * builds the host list (hlist)
 *
 * @param pnode - the node being added to the host list
 * @param hlist - the host list being built
 */ 
int build_host_list(

  struct howl    **hlistptr,  /* O */
  struct pbssubn  *snp,       /* I */
  struct pbsnode  *pnode)     /* I */
  
  {
  struct howl *curr;
  struct howl *prev;
  struct howl *hp;

  /* initialize the pointers */
  curr = (struct howl *)calloc(1, sizeof(struct howl));
  curr->order = pnode->nd_order;
  curr->name  = pnode->nd_name;
  curr->index = snp->index;
  curr->port = pnode->nd_mom_rm_port;

  /* find the proper place in the list */
  for (prev = NULL, hp = *hlistptr;hp;prev = hp, hp = hp->next)
    {
    if (curr->order <= hp->order)
      break;
    }  /* END for (prev) */

  /* set the correct pointers in the list */
  curr->next = hp;

  if (prev == NULL)
    *hlistptr = curr;
  else
    prev->next = curr;

  return(SUCCESS);
  }



int add_gpu_to_hostlist(
    
  struct howl    **hlistptr,
  struct gpusubn  *gn,
  struct pbsnode  *pnode)

  {
  struct howl *curr;
  struct howl *prev;
  struct howl *hp;
  char        *gpu_name;
  static char *gpu = (char *)"gpu";

  /* create gpu_name */
  gpu_name = (char *)calloc(1, strlen(pnode->nd_name) + strlen(gpu) + 2);
  sprintf(gpu_name, "%s-%s", pnode->nd_name, gpu);


  /* initialize the pointers */
  curr = (struct howl *)calloc(1, sizeof(struct howl));
  curr->order = pnode->nd_order;
  curr->name  = gpu_name;
  curr->index = gn->index;
  curr->port = pnode->nd_mom_rm_port;

  /* find the proper place in the list */
  for (prev = NULL, hp = *hlistptr;hp;prev = hp, hp = hp->next)
    {
    if (curr->order <= hp->order)
      break;
    }  /* END for (prev) */

  /* set the correct pointers in the list */
  curr->next = hp;

  if (prev == NULL)
    *hlistptr = curr;
  else
    prev->next = curr;

  return(SUCCESS);
  } /* END add_gpu_to_hostlist() */



/*
 * checks the gpus of pnode and places them in gpu_list as necessary
 */

int place_gpus_in_hostlist(

  struct pbsnode     *pnode,
  job                *pjob,
  node_job_add_info  *naji,
  struct howl       **gpu_list)

  {
  int             j;
  struct gpusubn *gn;

  char            log_buf[LOCAL_LOG_BUF_SIZE];

  /* place the gpus in the hostlist as well */
  for (j = 0; j < pnode->nd_ngpus && naji->gpu_needed > 0; j++)
    {
    sprintf(log_buf,
      "node: %s j %d ngpus %d need %d",
      pnode->nd_name,
      j,
      pnode->nd_ngpus,
      pnode->nd_ngpus_needed);
    
    if (LOGLEVEL >= 7)
      {
      log_ext(-1, __func__, log_buf, LOG_DEBUG);
      }
    DBPRT(("%s\n", log_buf));
    
    gn = pnode->nd_gpusn + j;

    if (pnode->nd_gpus_real)
      {
      if ((gn->state == gpu_unavailable) ||
          (gn->state == gpu_exclusive) ||
          ((((int)gn->mode == gpu_normal)) &&
           (gpu_mode_rqstd != gpu_normal) &&
           (gn->state != gpu_unallocated)))
        continue;
      }
    else
      {
      if ((gn->state == gpu_unavailable) ||
          (gn->inuse == TRUE))
        continue;
      }

    if ((gn->state == gpu_unavailable) ||
        ((gn->state == gpu_exclusive) && pnode->nd_gpus_real) ||
        ((pnode->nd_gpus_real) &&
         ((int)gn->mode == gpu_normal) &&
         ((gpu_mode_rqstd != gpu_normal) && (gn->state != gpu_unallocated))) ||
        ((!pnode->nd_gpus_real) && 
         (gn->inuse == TRUE)))
      continue;
    
    add_job_to_gpu_subnode(pnode,gn,pjob);
    naji->gpu_needed--;
    
    sprintf(log_buf,
      "ADDING gpu %s/%d to exec_gpus still need %d",
      pnode->nd_name,
      j,
      pnode->nd_ngpus_needed);

    if (LOGLEVEL >= 7)
      {
      log_ext(-1, __func__, log_buf, LOG_DEBUG);
      }
    DBPRT(("%s\n", log_buf));
    
    add_gpu_to_hostlist(gpu_list,gn,pnode);
    
    /*
     * If this a real gpu in exclusive/single job mode, or a gpu in default
     * mode and the job requested an exclusive mode then we change state
     * to exclusive so we cannot assign another job to it
     */
    
    if ((pnode->nd_gpus_real) && 
        ((gn->mode == gpu_exclusive_thread) ||
         (gn->mode == gpu_exclusive_process) ||
         ((gn->mode == gpu_normal) && 
          ((gpu_mode_rqstd == gpu_exclusive_thread) ||
           (gpu_mode_rqstd == gpu_exclusive_process)))))
      {
      gn->state = gpu_exclusive;
      
      sprintf(log_buf,
        "Setting gpu %s/%d to state EXCLUSIVE for job %s",
        pnode->nd_name,
        j,
        pjob->ji_qs.ji_jobid);
      
      if (LOGLEVEL >= 7)
        {
        log_ext(-1, __func__, log_buf, LOG_DEBUG);
        }
      }
    
    /*
     * If this a real gpu in shared/default job mode and the state is
     * unallocated then we change state to shared so only other shared jobs
     * can use it
     */
    
    if ((pnode->nd_gpus_real) && (gn->mode == gpu_normal) && 
        (gpu_mode_rqstd == gpu_normal) && (gn->state == gpu_unallocated))
      {
      gn->state = gpu_shared;
      
      sprintf(log_buf,
        "Setting gpu %s/%d to state SHARED for job %s",
        pnode->nd_name,
        j,
        pjob->ji_qs.ji_jobid);
      
      if (LOGLEVEL >= 7)
        {
        log_ext(-1, __func__, log_buf, LOG_DEBUG);
        }
      }
    }

  return(PBSE_NONE);
  } /* END place_gpus_in_hostlist() */




int add_mic_to_list(

  struct howl    **mic_list,
  struct pbsnode  *pnode,
  int              index)

  {
  struct howl *curr;
  struct howl *prev;
  struct howl *hp;
  char        *name;
  static char *mic = (char *)"mic";

  /* create gpu_name */
  name = (char *)calloc(1, strlen(pnode->nd_name) + strlen(mic) + 2);
  sprintf(name, "%s-%s", pnode->nd_name, mic);


  /* initialize the pointers */
  curr = (struct howl *)calloc(1, sizeof(struct howl));
  curr->order = pnode->nd_order;
  curr->name  = name;
  curr->index = index;
  curr->port = pnode->nd_mom_rm_port;

  /* find the proper place in the list */
  for (prev = NULL, hp = *mic_list; hp != NULL; prev = hp, hp = hp->next)
    {
    if (curr->order <= hp->order)
      break;
    }  /* END for (prev) */

  /* set the correct pointers in the list */
  curr->next = hp;

  if (prev == NULL)
    *mic_list = curr;
  else
    prev->next = curr;

  return(PBSE_NONE);
  } /* END add_mic_to_list() */




int place_mics_in_hostlist(

  struct pbsnode    *pnode,
  job               *pjob,
  node_job_add_info *naji,
  struct howl      **mic_list)

  {
  int i;

  for (i = 0; i < pnode->nd_nmics && naji->mic_needed > 0; i++)
    {
    if (add_job_to_mic(pnode, i, pjob) == PBSE_NONE)
      {
      naji->mic_needed--;
      add_mic_to_list(mic_list, pnode, i);
      }
    }

  return(PBSE_NONE);
  } /* END place_mics_in_hostlist() */




/*
 * checks the subnodes of pnode and places them in the host list
 * as necessary
 */

job_reservation_info *place_subnodes_in_hostlist(

  job                *pjob,
  struct pbsnode     *pnode,
  node_job_add_info  *naji,
  char               *ProcBMStr)

  {
#ifdef GEOMETRY_REQUESTS
  if (IS_VALID_STR(ProcBMStr))
    {
    job_reservation_info *node_info = reserve_node(pnode, pjob, ProcBMStr);

    if (node_info != NULL)
      {
      // nodes are used exclusively for GEOMETRY_REQUESTS
      pnode->nd_np_to_be_used = 0;
      naji->ppn_needed = 0;
      }

    return(node_info);
    }

#endif
  job_reservation_info   *node_info = (job_reservation_info *)calloc(1, sizeof(job_reservation_info));

  if (pnode->nd_slots.reserve_execution_slots(naji->ppn_needed, node_info->est) == PBSE_NONE)
    {
    /* SUCCESS */
    pnode->nd_np_to_be_used -= naji->ppn_needed;
    naji->ppn_needed = 0;

    node_info->port = pnode->nd_mom_rm_port;
    
    job_usage_info *jui = new job_usage_info(pjob->ji_qs.ji_jobid);
    jui->est = node_info->est;
    
    snprintf(node_info->node_name, sizeof(node_info->node_name), "%s", pnode->nd_name);
    pnode->nd_job_usages.push_back(jui);
    
    if ((pnode->nd_slots.get_number_free() <= 0) ||
        (pjob->ji_wattr[JOB_ATR_node_exclusive].at_val.at_long == TRUE))
      pnode->nd_state |= INUSE_JOB;
    }
  else
    {
    free(node_info);
    node_info = NULL;
    }

  return(node_info);
  } /* END place_subnodes_in_hostlist() */



/*
 * takes a struct howl and translates it to a string that will
 * become a job pbs_attribute (exec_hosts, exec_gpus, exec_ports)
 * NOTE: frees list (the struct howl)
 */

int translate_howl_to_string(

  struct howl  *list,
  char         *EMsg,
  int          *NCount,
  char        **str_ptr,
  char        **portstr_ptr,
  int           port)

  {
  struct howl *hp;
  struct howl *next;
  size_t       len = 1;
  int          count = 1;
  char        *str;
  char        *end;
  char        *portlist = NULL;
  char        *endport;

  for (hp = list;hp != NULL;hp = hp->next)
    {
    len += (strlen(hp->name) + 8);
    count++;
    }

  if ((str = (char *)calloc(1, len + 1)) == NULL)
    {
    log_err(ENOMEM, __func__, "Cannot allocate memory!");

    if (EMsg != NULL)
      sprintf(EMsg,"no nodes can be allocated to job");
    
    return(PBSE_RESCUNAV);
    }

  *str = '\0';

  if (port == TRUE)
    {
    /* port list will have a string of sister port addresses */
    if ((portlist = (char *)calloc(1, (count * PBS_MAXPORTNUM) + count)) == NULL)
      {
      log_err(ENOMEM, __func__, "Cannot allocate memory!");
      
      if (EMsg != NULL)
        sprintf(EMsg,"no nodes can be allocated to job");

      free(str);
      
      return(PBSE_RESCUNAV);
      }
  
    *portlist = '\0';
    }

  /* now copy in name+name+... */
  *NCount = 0;

  for (hp = list,end = str,endport = portlist; hp != NULL; hp = next)
    {
    (*NCount)++;

    sprintf(end, "%s/%d+",
      hp->name,
      hp->index);

    end += strlen(end);

    if (port == TRUE)
      {
      sprintf(endport, "%d+", hp->port);
      endport += strlen(endport);
      }

    next = hp->next;

    free(hp);
    }

  /* strip trailing '+' and assign pointers */
  str[strlen(str) - 1] = '\0';
  *str_ptr = str;

  if (port == TRUE)
    {
    portlist[strlen(portlist) - 1] = '\0';
    *portstr_ptr = portlist;
    }

  return(PBSE_NONE);
  } /* END translate_howl_to_string() */



int translate_job_reservation_info_to_string(
    
  std::vector<job_reservation_info *>  &host_info, 
  int                                  *NCount, 
  std::stringstream                    &exec_host_output,
  std::stringstream                    *exec_port_output)

  {
  bool              first = true;

  for (int hi_index = 0; hi_index < (int)host_info.size(); hi_index++)
    {
    job_reservation_info *jri = host_info[hi_index];
    int                   jri_index;
    int                   jri_iterator = -1;
    
    (*NCount)++;

    while ((jri_index = jri->est.get_next_occupied_index(jri_iterator)) != -1)
      {
      if (first == false)
        {
        exec_host_output << "+";

        if (exec_port_output != NULL)
          *exec_port_output << "+";
        }

      first = false;

      exec_host_output << jri->node_name << "/" << jri_index;

      if (exec_port_output != NULL)
        *exec_port_output << jri->port;
      }
    }

  return(PBSE_NONE);
  } /* END translate_job_reservation_info_to_string() */



/*
 * free the struct that holds the information for where the job
 * will be placed  
 **/
void free_naji(
    
  node_job_add_info *naji)

  {
  node_job_add_info *current = NULL;
  node_job_add_info *tmp = NULL;
  node_job_add_info *first = naji;

  current = naji;
  while (current != NULL) 
    {
    tmp = current;
    current = current->next;
    free(tmp);
    if (current == first)
      break;
    }
  } /* END free_naji() */



/*
 * external nodes refers only to nodes outside of the cray
 * for jobs that also have cray compute nodes
 */

int record_external_node(

  job            *pjob,
  struct pbsnode *pnode)

  {
  char         *external_nodes;
  unsigned int  len;

  if (pjob->ji_wattr[JOB_ATR_external_nodes].at_val.at_str == NULL)
    {
    pjob->ji_wattr[JOB_ATR_external_nodes].at_val.at_str = strdup(pnode->nd_name);
    pjob->ji_wattr[JOB_ATR_external_nodes].at_flags |= ATR_VFLAG_SET;
    }
  else
    {
    len = strlen(pjob->ji_wattr[JOB_ATR_external_nodes].at_val.at_str) + strlen(pnode->nd_name) + 2;
    external_nodes = (char *)calloc(1, len);

    snprintf(external_nodes, len, "%s+%s",
      pjob->ji_wattr[JOB_ATR_external_nodes].at_val.at_str, pnode->nd_name);

    free(pjob->ji_wattr[JOB_ATR_external_nodes].at_val.at_str);

    pjob->ji_wattr[JOB_ATR_external_nodes].at_val.at_str = external_nodes;
    }

  return(PBSE_NONE);
  } /* END record_external_node() */



/*
 * builds the hostlist based on the nodes=... part of the request
 */

int build_hostlist_nodes_req(

  job                                  *pjob,      /* M */
  char                                 *EMsg,      /* O */
  char                                 *spec,      /* I */
  short                                 newstate,  /* I */
  std::vector<job_reservation_info *>  &host_info, /* O */
  struct howl                         **gpu_list,  /* O */
  struct howl                         **mic_list,  /* O */ 
  node_job_add_info                    *naji,      /* I - freed */
  char                                 *ProcBMStr) /* I */

  {
  struct pbsnode    *pnode = NULL;

  node_job_add_info *current;
  char               log_buf[LOCAL_LOG_BUF_SIZE];
  bool               failure = false;

  current = naji;

  while (current != NULL)
    {
    if ((pnode = find_nodebyname(current->node_name)) != NULL)
      {
      if (failure == true)
        {
        /* just remove the marked request from the node */
        pnode->nd_np_to_be_used    -= current->ppn_needed;
        pnode->nd_ngpus_to_be_used -= current->gpu_needed;
        pnode->nd_nmics_to_be_used -= current->mic_needed;
        }
      else
        {
        job_reservation_info *host_single = place_subnodes_in_hostlist(pjob, pnode, current, ProcBMStr);

        if (host_single != NULL)
          {
          host_info.push_back(host_single);
          place_gpus_in_hostlist(pnode, pjob, current, gpu_list);
          place_mics_in_hostlist(pnode, pjob, current, mic_list);
        
          if (current->is_external == TRUE)
            {
            record_external_node(pjob, pnode);
            }
          }

        /* NOTE: continue through the loop if failure is true just to clean up amounts needed */

        if ((naji->gpu_needed > 0) || 
            (naji->ppn_needed > 0) ||
            (naji->mic_needed > 0))
          {
          failure = true;
       
          /* remove any remaining things marked on the node */
          pnode->nd_np_to_be_used    -= current->ppn_needed;
          pnode->nd_ngpus_to_be_used -= current->gpu_needed;
          pnode->nd_nmics_to_be_used -= current->mic_needed;
          }
        }

      unlock_node(pnode, __func__, NULL, LOGLEVEL);
      }

    current = current->next;
    } /* END processing reserved nodes */
   
  free_naji(naji);

  if (failure == true)
    {
    /* did not satisfy the request */
    if (EMsg != NULL)
      {
      sprintf(log_buf,
        "could not locate requested gpu resources '%.4000s' (node_spec failed) %s",
        spec,
        EMsg);
      
      log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buf);
      }

    return(PBSE_RESCUNAV);
    }

  return(PBSE_NONE);
  } /* END build_hostlist_nodes_req() */




int build_hostlist_procs_req(

  job                                 *pjob,     /* M */
  int                                  procs,    /* I */
  short                                newstate, /* I */
  std::vector<job_reservation_info *> &host_info) /* O */

  {
  int             procs_needed;
  node_iterator   iter;
  struct pbsnode *pnode = NULL;

  /* did we have a request for procs? Do those now */
  if (procs > 0)
    {
    /* check to see if a -l nodes request was made */
    if (pjob->ji_have_nodes_request)
      {
      procs_needed = procs;
      }
    else
      {
      /* the qsub request used -l procs only. No -l nodes=x
         was given in the qsub request.
         TORQUE allocates 1 node by default if a -l nodes specification
         is not given.
      */
      if (procs > 1)
        {
        procs_needed = procs - 1;
        }
      else
        procs_needed = 1;
      }
  
    reinitialize_node_iterator(&iter);

    while ((pnode = next_node(&allnodes,pnode,&iter)) != NULL)
      {
      int execution_slots_free = pnode->nd_slots.get_number_free();

      if (execution_slots_free > 0)
        {
        job_reservation_info   *node_info = (job_reservation_info *)calloc(1, sizeof(job_reservation_info));
        if (pnode->nd_slots.reserve_execution_slots(execution_slots_free, node_info->est) == PBSE_NONE)
          {
          procs_needed -= execution_slots_free;

          host_info.push_back(node_info);
          node_info->port = pnode->nd_mom_rm_port;
          }
        }
      } /* END for each node */
    } /* if (procs > 0) */

  return(PBSE_NONE);
  } /* END build_hostlist_procs_req() */




int add_to_ms_list(
    
  char *node_name,
  job  *pjob)

  {
  struct pbsnode *pnode = find_nodebyname(node_name);

  if (pnode != NULL)
    {
    insert_thing(pnode->nd_ms_jobs, strdup(pjob->ji_qs.ji_jobid));

    unlock_node(pnode, __func__, NULL, LOGLEVEL);
    }

  return(PBSE_NONE);
  } /* END add_to_ms_list() */




void free_alps_req_data_array(
    
  alps_req_data *ard_array,
  int            num_reqs)

  {
  int i;

  for (i = 0; i < num_reqs; i++)
    free_dynamic_string(ard_array[i].node_list);

  free(ard_array);
  } /* END free_alps_req_data_array() */




int add_multi_reqs_to_job(
    
  job           *pjob,
  int            num_reqs,
  alps_req_data *ard_array)

  {
  int             i;
  dynamic_string *attr_str;
  char            buf[MAXLINE];

  if (ard_array == NULL)
    return(PBSE_NONE);

  attr_str = ard_array[0].node_list;

  for (i = 0; i < num_reqs; i++)
    {
    if (i != 0)
      {
      append_char_to_dynamic_string(attr_str, '|');
      append_dynamic_string(attr_str, ard_array[i].node_list->str);
      }

    snprintf(buf, sizeof(buf), "*%d", ard_array[i].ppn);
    append_dynamic_string(attr_str, buf);
    }

  if (pjob->ji_wattr[JOB_ATR_multi_req_alps].at_val.at_str != NULL)
    free(pjob->ji_wattr[JOB_ATR_multi_req_alps].at_val.at_str);

  pjob->ji_wattr[JOB_ATR_multi_req_alps].at_val.at_str = strdup(attr_str->str);
  pjob->ji_wattr[JOB_ATR_multi_req_alps].at_flags |= ATR_VFLAG_SET;

  return(PBSE_NONE);
  } /* END add_multi_reqs_to_job() */

int free_hostinfo(

    std::vector<job_reservation_info *>  &host_info) /* O */

  {
  for (unsigned int i = 0; i < host_info.size(); i++)
    {
    job_reservation_info *host_single = host_info[i];

    if (host_single != NULL)
      free(host_single);
    }
  return(PBSE_NONE);
  }


/*
 * set_nodes() - Call node_spec() to allocate nodes then set them inuse.
 * Build list of allocated nodes to pass back in rtnlist.
 *      Return: PBS error code
 */

int set_nodes(

  job   *pjob,        /* I */
  char  *spec,        /* I */
  int    procs,       /* I */
  char **rtnlist,     /* O */
  char **rtnportlist, /* O */
  char  *FailHost,    /* O (optional,minsize=1024) */
  char  *EMsg)        /* O (optional,minsize=1024) */

  {
  std::vector<job_reservation_info *> host_info;
  std::stringstream                   exec_hosts;
  std::stringstream                   exec_ports;
/*  job_reservation_info *gpu_info  = NULL;
  job_reservation_info *mic_info  = NULL; */
  struct howl       *gpu_list = NULL;
  struct howl       *mic_list = NULL;

  int                i;
  int                rc;
  int                NCount = 0;
  short              newstate;

  char              *login_prop = NULL;
  char              *gpu_str = NULL;
  char              *mic_str = NULL;
  char               ProcBMStr[MAX_BM];
  char               log_buf[LOCAL_LOG_BUF_SIZE];
  node_job_add_info *naji = NULL;
  alps_req_data     *ard_array = NULL;
  int                num_reqs = 0;
  long               cray_enabled = FALSE; 
  enum job_types     job_type;

  int gpu_flags = 0;

  if (FailHost != NULL)
    FailHost[0] = '\0';

  if (EMsg != NULL)
    EMsg[0] = '\0';

  if (LOGLEVEL >= 3)
    {
    sprintf(log_buf, "allocating nodes for job %s with node expression '%.4000s'",
      pjob->ji_qs.ji_jobid,
      spec);

    log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);
    }

  ProcBMStr[0] = '\0';
#ifdef GEOMETRY_REQUESTS
  get_bitmap(pjob,sizeof(ProcBMStr),ProcBMStr);
#endif /* GEOMETRY_REQUESTS */

  naji = (node_job_add_info *)calloc(1, sizeof(node_job_add_info));

  if (pjob->ji_wattr[JOB_ATR_login_prop].at_flags & ATR_VFLAG_SET)
    login_prop = pjob->ji_wattr[JOB_ATR_login_prop].at_val.at_str;

  /* allocate nodes */
  if ((i = node_spec(spec,
                     1,
                     1,
                     ProcBMStr,
                     FailHost,
                     naji,
                     EMsg,
                     login_prop,
                     &ard_array,
                     &num_reqs,
                     job_type)) == 0)
    {
    /* no resources located, request failed */
    if (EMsg != NULL)
      {
      sprintf(log_buf,
        "could not locate requested resources '%.4000s' (node_spec failed) %s",
        spec,
        EMsg);

      log_record(PBSEVENT_JOB,PBS_EVENTCLASS_JOB,pjob->ji_qs.ji_jobid,log_buf);
      }

    free_naji(naji);
    free_alps_req_data_array(ard_array, num_reqs);

    return(PBSE_RESCUNAV);
    }
  else if (i < 0)
    {
    /* request failed, corrupt request */
    log_err(PBSE_UNKNODE, __func__, "request failed, corrupt request");
    free_naji(naji);
    free_alps_req_data_array(ard_array, num_reqs);
    return(PBSE_UNKNODE);
    }
 
  get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled);
  if (cray_enabled == TRUE)
    {
    // JOB_TYPE_normal means no component from the Cray will be used
    if ((job_type != JOB_TYPE_normal) && 
        (naji->next != NULL))
      {
      pjob->ji_wattr[JOB_ATR_login_node_id].at_val.at_str = strdup(naji->node_name);
      pjob->ji_wattr[JOB_ATR_login_node_id].at_flags = ATR_VFLAG_SET;
      }
    }

  newstate = INUSE_JOB;

  if ((rc = build_hostlist_nodes_req(pjob,
                                     EMsg,
                                     spec,
                                     newstate,
                                     host_info,
                                     &gpu_list,
                                     &mic_list,
                                     naji,
                                     ProcBMStr)) != PBSE_NONE)
    {
    free_nodes(pjob);
    free_alps_req_data_array(ard_array, num_reqs);
    free_hostinfo(host_info);
    return(rc);
    }

  if ((rc = build_hostlist_procs_req(pjob, procs, newstate, host_info)) != PBSE_NONE)
    {
    free_nodes(pjob);
    free_alps_req_data_array(ard_array, num_reqs);
    free_hostinfo(host_info);
    return(rc);
    }

  if (host_info.empty() == true)
    {
    if (LOGLEVEL >= 1)
      {
      sprintf(log_buf, "no nodes can be allocated to job %s",
        pjob->ji_qs.ji_jobid);

      log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);
      }

    if (EMsg != NULL)
      sprintf(EMsg, "no nodes can be allocated to job");
    
    free_alps_req_data_array(ard_array, num_reqs);

    free_hostinfo(host_info);
    return(PBSE_RESCUNAV);
    }  /* END if (host_info.size() == 0) */

  pjob->ji_qs.ji_svrflags |= JOB_SVFLG_HasNodes;  /* indicate has nodes */

  /* build list of allocated nodes, gpus, and ports */
  rc = translate_job_reservation_info_to_string(host_info, &NCount, exec_hosts, &exec_ports);
  if (rc != PBSE_NONE)
    {
    free_alps_req_data_array(ard_array, num_reqs);
    free_hostinfo(host_info);
    return(rc);
    }

  *rtnlist = strdup(exec_hosts.str().c_str());
  *rtnportlist = strdup(exec_ports.str().c_str());

  // JOB_TYPE_normal means no component from the Cray will be used
  if ((cray_enabled == TRUE) &&
      (job_type != JOB_TYPE_normal))
    {
    char *plus = strchr(*rtnlist, '+');

    /* only do this if there's more than one host in the host list */
    if (plus != NULL)
      {
      char *to_free = *rtnlist;

      *plus = '\0';
      *rtnlist = strdup(plus + 1);
      free(to_free);
      }
    }

  if (mic_list != NULL)
    {
    if ((rc = translate_howl_to_string(mic_list, EMsg, &NCount, &mic_str, NULL, FALSE)) != PBSE_NONE)
      {
      free_hostinfo(host_info);
      return(rc);
      }

    job_attr_def[JOB_ATR_exec_mics].at_free(
      &pjob->ji_wattr[JOB_ATR_exec_mics]);

    job_attr_def[JOB_ATR_exec_mics].at_decode(
      &pjob->ji_wattr[JOB_ATR_exec_mics],
      NULL,
      NULL,
      mic_str,
      0);

    free(mic_str);
    }

  if (gpu_list != NULL)
    {
    if ((rc = translate_howl_to_string(gpu_list, EMsg, &NCount, &gpu_str, NULL, FALSE)) != PBSE_NONE)
      {
      free_alps_req_data_array(ard_array, num_reqs);
      free_hostinfo(host_info);
      return(rc);
      }

    job_attr_def[JOB_ATR_exec_gpus].at_free(
      &pjob->ji_wattr[JOB_ATR_exec_gpus]);
    
    job_attr_def[JOB_ATR_exec_gpus].at_decode(
      &pjob->ji_wattr[JOB_ATR_exec_gpus],
      NULL,
      NULL,
      gpu_str,
      0);  /* O */
    
    free(gpu_str);

    if (gpu_mode_rqstd != -1)
      gpu_flags = gpu_mode_rqstd;
    if (gpu_err_reset)
      gpu_flags += 1000;

    if (gpu_flags >= 0)
      {
      pjob->ji_wattr[JOB_ATR_gpu_flags].at_val.at_long = gpu_flags;
      pjob->ji_wattr[JOB_ATR_gpu_flags].at_flags = ATR_VFLAG_SET | ATR_VFLAG_MODIFY;
      
      if (LOGLEVEL >= 7)
        {
        sprintf(log_buf, "setting gpu_flags for job %s to %d %ld",
          pjob->ji_qs.ji_jobid,
          gpu_flags,
          pjob->ji_wattr[JOB_ATR_gpu_flags].at_val.at_long);

        log_ext(-1, __func__, log_buf, LOG_DEBUG);
        }
      }
    }

  if (LOGLEVEL >= 3)
    {
    snprintf(log_buf, sizeof(log_buf), "job %s allocated %d nodes (nodelist=%.4000s)",
      pjob->ji_qs.ji_jobid,
      NCount,
      *rtnlist);

    log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);
    }

  add_multi_reqs_to_job(pjob, num_reqs, ard_array);
  free_alps_req_data_array(ard_array, num_reqs);
  free_hostinfo(host_info);

  /* SUCCESS */

  return(PBSE_NONE);
  }  /* END set_nodes() */




/* count the number of requested processors in a node spec
 * return processors requested on success
 * return -1 on error 
 */ 
int procs_requested(
    
  char *spec)

  {
  char        *str;
  char        *globs;
  char        *cp;
  char        *hold;
  int          num_nodes = 0;
  int          num_procs = 1;
  int          total_procs = 0;
  int          num_gpus = 0;
  int          num_mics = 0;
  int          i;
  struct prop *prop = NULL;
  char        *tmp_spec;
  char         log_buf[LOCAL_LOG_BUF_SIZE];

  tmp_spec = strdup(spec);  
  
  if (tmp_spec == NULL)
    {
    /* FAILURE */

    sprintf(log_buf,"cannot alloc memory");

    if (LOGLEVEL >= 1)
      {
      log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);
      }

    return(-2);
    }

  /* Check to see if we have a global modifier */
  if ((globs = strchr(tmp_spec, '#')) != NULL)
    {
    *globs++ = '\0';

    globs = strdup(globs);

    while ((cp = strrchr(globs, '#')) != NULL)
      {
      *cp++ = '\0';

      hold = mod_spec(spec, cp);
      
      free(tmp_spec);
      
      tmp_spec = hold;
      }

    hold = mod_spec(tmp_spec, globs);
    
    free(tmp_spec);
    
    tmp_spec = hold;

    free(globs);
    }  /* END if ((globs = strchr(spec,'#')) != NULL) */

  str = tmp_spec;

  do
    {
    if ((i = number(&str, &num_nodes)) == -1)
      {
      free(tmp_spec);
      /* Bad string syntax. Fail */
      return(-1);
      }

    if (i == 0)
      {
      /* number exists */
      if (*str == ':')
        {
        /* there are properties */

        str++;

        if (proplist(&str, &prop, &num_procs, &num_gpus, &num_mics))
          {
          free(tmp_spec);
          return(-1);
          }
        }
      }
    else
      {
      /* no number */
      num_nodes = 1;
      if (proplist(&str, &prop, &num_procs, &num_gpus, &num_mics))
        {
        /* must be a prop list with no number in front */
        free(tmp_spec);

        return(-1);
        }
      }
    total_procs += num_procs * num_nodes;
    } while(*str++ == '+');
  
  free(tmp_spec);
  
  return(total_procs);
  } /* END procs_requested() */





/*
 * node_avail_complex -
 * *navail is set to number available
 * *nalloc is set to number allocated
 * *nresvd is set to number reserved
 * *ndown  is set to number down/offline
 *      return -1 on failure
 */

int node_avail_complex(

  char *spec,   /* I - node spec */
  int  *navail, /* O - number available */
  int  *nalloc, /* O - number allocated */
  int  *nresvd, /* O - number reserved  */
  int  *ndown)  /* O - number down      */

  {
  int            ret;
  enum job_types job_type;

  ret = node_spec(spec, 1, 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, job_type);

  *navail = ret;
  *nalloc = 0;
  *nresvd = 0;
  *ndown  = 0;

  return(ret);
  }  /* END node_avail_complex() */





/*
 * node_avail - report if nodes requested are available
 *
 * Return 0 when no error in request and
 *  *navail is set to number available
 *  *nalloc is set to number allocated
 *  *nresvd is set to number reserved
 *  *ndown  is set to number down/offline
 *      !=0 error number when error in request
 */

int node_avail(

  char *spec,  /* I  - node spec */
  int  *navail, /* O - number available */
  int *nalloc, /* O - number allocated */
  int *nresvd, /* O - number reserved  */
  int *ndown)  /* O - number down      */

  {
  int             j;
  int             holdnum;

  struct pbsnode *pn;
  char           *pc;

  struct prop    *prop = NULL;
  register int    xavail;
  register int    xalloc;
  register int    xresvd;
  register int    xdown;
  int             node_req = 1;
  int             gpu_req = 0;
  int             mic_req = 0;

  node_iterator   iter;

  if (spec == NULL)
    {
    log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, __func__, "no spec");

    return(RM_ERR_NOPARAM);
    }

  pc = spec;

  if ((strchr(spec, (int)'+') == NULL) && (number(&pc, &holdnum) == 1))
    {
    /* A simple node spec - reply with numbers of avaiable, */
    /* allocated, reserved, and down nodes that match the */
    /* the spec, null or simple number means all  */

    xavail = 0;
    xalloc = 0;
    xresvd = 0;
    xdown  = 0;

    /* find number of a specific type of node */

    if (*pc)
      {
      if (proplist(&pc, &prop, &node_req, &gpu_req, &mic_req))
        {
        return(RM_ERR_BADPARAM);
        }
      }

    reinitialize_node_iterator(&iter);
    pn = NULL;

    while ((pn = next_node(&allnodes, pn, &iter)) != NULL)
      {
      if ((pn->nd_ntype == NTYPE_CLUSTER) && hasprop(pn, prop))
        {
        if (pn->nd_state & (INUSE_OFFLINE | INUSE_DOWN))
          ++xdown;
        else if (hasppn(pn, node_req, SKIP_ANYINUSE))
          ++xavail;
        else if (hasppn(pn, node_req, SKIP_NONE))
          {
          /* node has enough processors, are they busy or reserved? */
          j = pn->nd_slots.get_total_execution_slots() - pn->nd_np_to_be_used;
          
          if (j >= node_req)
            ++xresvd;
          else
            ++xalloc;
          }
        }
      } /* END for each node */

    free_prop(prop);

    *navail = xavail;

    *nalloc = xalloc;

    *nresvd = xresvd;

    *ndown  = xdown;

    return(0);
    }
  else if (number(&pc, &holdnum) == -1)
    {
    /* invalid spec */

    return(RM_ERR_BADPARAM);
    }

  /* not a simple spec - determine if supplied complex */
  /* node spec can be satisified from avail nodes */
  /* navail set to >0 if can be satified now  */
  /*    0 if not now but possible  */
  /*   -l if never possible   */

  node_avail_complex(spec, navail, nalloc, nresvd, ndown);

  return(0);
  }  /* END node_avail() */




/*
 * node_reserve - Reserve nodes
 *
 * Returns: >0 - reservation succeeded, number of nodes reserved
 *    0 - None or partial reservation
 *   -1 - requested reservation impossible
 */

int node_reserve(

  char       *nspec, /* In     - a node specification */
  resource_t  tag)   /* In/Out - tag for resource if reserved */

  {

  struct pbsnode    *pnode;
  int                ret_val;

  node_iterator      iter;
  char               log_buf[LOCAL_LOG_BUF_SIZE];
  node_job_add_info  *naji = NULL;
  enum job_types      job_type;

  DBPRT(("%s: entered\n", __func__))

  if ((nspec == NULL) || (*nspec == '\0'))
    {
    log_event(PBSEVENT_ADMIN, PBS_EVENTCLASS_SERVER, __func__, "no spec");

    return(-1);
    }

  naji = (node_job_add_info *)calloc(1, sizeof(node_job_add_info));

  if ((ret_val = node_spec(nspec, 0, 0, NULL, NULL, naji, NULL, NULL, NULL, NULL, job_type)) >= 0)
    {
    /*
    ** Zero or more of the needed Nodes are available to be
    ** reserved.
    */
    reinitialize_node_iterator(&iter);
    pnode = NULL;

    while ((pnode = next_node(&allnodes,pnode,&iter)) != NULL)
      {
      if (pnode->nd_flag != thinking)
        {
        continue;   /* skip this one */
        }


      if (pnode->nd_np_to_be_used == pnode->nd_slots.get_total_execution_slots())
        pnode->nd_state |= INUSE_RESERVE;
      } /* END for each node */
    }
  else
    {
    /* could never satisfy the reservation */

    snprintf(log_buf, sizeof(log_buf), "can never reserve %s", nspec);

    log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);
    }

  free_naji(naji);

  return(ret_val);
  }  /* END node_reserve() */




char *get_next_exec_host(

  char **current)

  {
  char *name_ptr = *current;
  char *plus;
  char *slash;
  
  if (name_ptr != NULL)
    {
    if ((plus = strchr(name_ptr, '+')) != NULL)
      {
      *current = plus + 1;
      *plus = '\0';
      }
    else
      *current = NULL;

    if ((slash = strchr(name_ptr, '/')) != NULL)
      *slash = '\0';
    }

  return(name_ptr);
  } /* END get_next_exec_host() */



int remove_job_from_nodes_gpus(

  struct pbsnode *pnode,
  job            *pjob)

  {
  struct gpusubn *gn;
  char           *gpu_str = NULL;
  int             i;
  char            log_buf[LOCAL_LOG_BUF_SIZE];
  char            tmp_str[PBS_MAXHOSTNAME + 10];
  char            num_str[6];
 
  if (pjob->ji_wattr[JOB_ATR_exec_gpus].at_flags & ATR_VFLAG_SET)
    gpu_str = pjob->ji_wattr[JOB_ATR_exec_gpus].at_val.at_str;

  if (gpu_str != NULL)
    {
    /* reset gpu nodes */
    for (i = 0; i < pnode->nd_ngpus; i++)
      {
      gn = pnode->nd_gpusn + i;
      
      if (pnode->nd_gpus_real)
        {
        /* reset real gpu nodes */
        strcpy (tmp_str, pnode->nd_name);
        strcat (tmp_str, "-gpu/");
        sprintf (num_str, "%d", i);
        strcat (tmp_str, num_str);
        
        /* look thru the string and see if it has this host and gpuid.
         * exec_gpus string should be in format of 
         * <hostname>-gpu/<index>[+<hostname>-gpu/<index>...]
         *
         * if we are using the gpu node exclusively or if shared mode and
         * this is last job assigned to this gpu then set it's state
         * unallocated so its available for a new job. Takes time to get the
         * gpu status report from the moms.
         */
        
        if (strstr(gpu_str, tmp_str) != NULL)
          {
          gn->job_count--;
          
          if ((gn->mode == gpu_exclusive_thread) ||
              (gn->mode == gpu_exclusive_process) ||
              ((gn->mode == gpu_normal) && 
               (gn->job_count == 0)))
            {
            gn->state = gpu_unallocated;
            
            if (LOGLEVEL >= 7)
              {
              sprintf(log_buf, "freeing node %s gpu %d for job %s",
                pnode->nd_name,
                i,
                pjob->ji_qs.ji_jobid);
              
              log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);
              }
            
            }
          }
        }
      else
        {
        if (!strcmp(gn->jobid, pjob->ji_qs.ji_jobid))
          {
          gn->inuse = FALSE;
          memset(gn->jobid, 0, sizeof(gn->jobid));
          
          pnode->nd_ngpus_free++;
          }
        }
      }
    }

  return(PBSE_NONE);
  } /* END remove_job_from_nodes_gpus() */




int remove_job_from_node(

  struct pbsnode *pnode,
  const char     *jobid)

  {
  char log_buf[LOCAL_LOG_BUF_SIZE];

  for (int i = 0; i < (int)pnode->nd_job_usages.size(); i++)
    {
    job_usage_info *jui = pnode->nd_job_usages[i];

    if (!strcmp(jui->jobid, jobid))
      {
      pnode->nd_slots.unreserve_execution_slots(jui->est);
      pnode->nd_job_usages.erase(pnode->nd_job_usages.begin() + i);

      if (LOGLEVEL >= 6)
        {
        sprintf(log_buf, "increased execution slot free count to %d of %d\n",
          pnode->nd_slots.get_number_free(),
          pnode->nd_slots.get_total_execution_slots());
        
        log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);
        }

      pnode->nd_state &= ~INUSE_JOB;
      delete jui;

      break;
      }
    }
  
  return(PBSE_NONE);
  } /* END remove_job_from_node() */




/*
 * free_nodes - free nodes allocated to a job
 */

void free_nodes(

  job *pjob)  /* I (modified) */

  {
  struct pbsnode *pnode;

  char            log_buf[LOCAL_LOG_BUF_SIZE];
  char           *exec_hosts = NULL;
  char           *host_ptr = NULL;
  char           *hostname;
  char           *previous_hostname = NULL;

  if (LOGLEVEL >= 3)
    {
    sprintf(log_buf, "freeing nodes for job %s", pjob->ji_qs.ji_jobid);

    log_record(PBSEVENT_SCHED, PBS_EVENTCLASS_REQUEST, __func__, log_buf);
    }

  if (pjob->ji_wattr[JOB_ATR_exec_host].at_flags & ATR_VFLAG_SET)
    {
    if (pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str != NULL)
      {
      exec_hosts = strdup(pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str);
      host_ptr = exec_hosts;
      }
    }

  while ((hostname = get_next_exec_host(&host_ptr)) != NULL)
    {
    if ((previous_hostname) != NULL)
      {
      if (!strcmp(hostname, previous_hostname))
        continue;
      }

    previous_hostname = hostname;

    if ((pnode = find_nodebyname(hostname)) != NULL)
      {
      remove_job_from_node(pnode, pjob->ji_qs.ji_jobid);
      remove_job_from_nodes_gpus(pnode, pjob);
      remove_job_from_nodes_mics(pnode, pjob);
      unlock_node(pnode, __func__, NULL, LOGLEVEL);
      }
    }

  free(exec_hosts);

  if (pjob->ji_wattr[JOB_ATR_login_node_id].at_val.at_str != NULL)
    {
    if ((pnode = find_nodebyname(pjob->ji_wattr[JOB_ATR_login_node_id].at_val.at_str)) != NULL)
      {
      remove_job_from_node(pnode, pjob->ji_qs.ji_jobid);
      unlock_node(pnode, __func__, NULL, LOGLEVEL);
      }
    }

  pjob->ji_qs.ji_svrflags &= ~JOB_SVFLG_HasNodes;

  return;
  }  /* END free_nodes() */




struct pbsnode *get_compute_node(

  char *node_name)

  {
  struct pbsnode *ar = alps_reporter;
  struct pbsnode *compute_node = NULL;
  unsigned int    i;
  unsigned int    len = strlen(node_name);

  for (i = 0; i < len; i++)
    {
    if (isdigit(node_name[i]) == FALSE)
      {
      /* found a non-numeric character - not a compute node */
      return(NULL);
      }
    }

  lock_node(ar, __func__, NULL, LOGLEVEL);
  compute_node = create_alps_subnode(ar, node_name);
  unlock_node(ar, __func__, NULL, LOGLEVEL);

  return(compute_node);
  } /* END get_compute_node() */




/*
 * set_one_old - set a named node as allocated to a job
 */

void set_one_old(

  char *name,
  job  *pjob)

  {
  int             index;

  struct pbsnode *pnode;
  char           *pc;
  long            cray_enabled = FALSE;

  if ((pc = strchr(name, (int)'/')))
    {
    index = atoi(pc + 1);

    *pc = '\0';
    }
  else
    {
    index = 0;
    }

  get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled);

  pnode = find_nodebyname(name);

  if (cray_enabled == TRUE)
    {
    if (pnode == NULL)
      pnode = get_compute_node(name);

    if (pnode != NULL)
      {
      if (pnode->parent == alps_reporter)
        {
        while (index >= pnode->nd_slots.get_total_execution_slots())
          {
          add_execution_slot(pnode);
          }
        }
      }
    }

  if (pnode != NULL)
    {
    bool found = false;
    /* Mark node as being IN USE ...  */

    for (int i = 0; i < (int)pnode->nd_job_usages.size(); i++)
      {
      job_usage_info *jui = pnode->nd_job_usages[i];

      if (!strcmp(jui->jobid, pjob->ji_qs.ji_jobid))
        {
        found = true;

        while (index >= jui->est.get_total_execution_slots())
          jui->est.add_execution_slot();

        jui->est.mark_as_used(index);
        pnode->nd_slots.mark_as_used(index);
        }
      }

    if (found == false)
      {
      job_usage_info *jui = new job_usage_info(pjob->ji_qs.ji_jobid);
        
      while (index >= jui->est.get_total_execution_slots())
        jui->est.add_execution_slot();

      jui->est.mark_as_used(index);
      pnode->nd_slots.mark_as_used(index);
      pnode->nd_job_usages.push_back(jui);
      }

    if (pnode->nd_slots.get_number_free() <= 0)
      pnode->nd_state |= INUSE_JOB;

    unlock_node(pnode, __func__, NULL, LOGLEVEL);
    }

  return;
  }  /* END set_one_old() */





/*
 * set_old_nodes - set "old" nodes as in use - called from pbsd_init()
 * when recovering a job in the running state.
 */

void set_old_nodes(

  job *pjob)  /* I (modified) */

  {
  char     *old;
  char     *po;
  long      cray_enabled = FALSE;

  if (pjob->ji_wattr[JOB_ATR_exec_host].at_flags & ATR_VFLAG_SET)
    {
    old = strdup(pjob->ji_wattr[JOB_ATR_exec_host].at_val.at_str);

    if (old == NULL)
      {
      /* FAILURE - cannot alloc memory */

      return;
      }

    while ((po = strrchr(old, (int)'+')) != NULL)
      {
      *po++ = '\0';

      set_one_old(po, pjob);
      }

    set_one_old(old, pjob);

    free(old);
    } /* END if pjobs exec host is set */

  /* record the job on the alps_login if cray_enabled */
  get_svr_attr_l(SRV_ATR_CrayEnabled, &cray_enabled);
  if ((cray_enabled == TRUE) &&
      (pjob->ji_wattr[JOB_ATR_login_node_id].at_flags & ATR_VFLAG_SET))
    {
    set_one_old(pjob->ji_wattr[JOB_ATR_login_node_id].at_val.at_str, pjob);
    }

  return;
  }  /* END set_old_nodes() */

    

job *get_job_from_job_usage_info(
    
  job_usage_info *jui,
  struct pbsnode *pnode)

  {
  job *pjob;

  unlock_node(pnode, __func__, NULL, LOGLEVEL);
  pjob = svr_find_job(jui->jobid, TRUE);
  lock_node(pnode, __func__, NULL, LOGLEVEL);

  return(pjob);
  }


  
job *get_job_from_jobinfo(
    
  struct jobinfo *jp,
  struct pbsnode *pnode)
  
  {
  job *pjob;

  unlock_node(pnode, __func__, NULL, LOGLEVEL);
  pjob = svr_find_job(jp->jobid, TRUE);
  lock_node(pnode, __func__, NULL, LOGLEVEL);

  return(pjob);
  } /* END get_job_from_jobinfo() */


/* END node_manager.c */