XRootD
XrdCmsClientMan.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d C m s C l i e n t M a n . c c */
4 /* */
5 /* (c) 2007 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <ctime>
32 
35 #include "XrdCms/XrdCmsLogin.hh"
36 #include "XrdCms/XrdCmsTrace.hh"
37 
39 
40 #include "XrdSys/XrdSysError.hh"
41 #include "XrdSys/XrdSysTimer.hh"
42 
43 #include "Xrd/XrdInet.hh"
44 #include "Xrd/XrdLink.hh"
45 
46 using namespace XrdCms;
47 
48 /******************************************************************************/
49 /* G l o b a l s */
50 /******************************************************************************/
51 
52 XrdOucBuffPool XrdCmsClientMan::BuffPool(XrdOucEI::Max_Error_Len, 65536, 1, 16);
53 
54 XrdInet *XrdCmsClientMan::Network = 0;
55 
57 
58 const char *XrdCmsClientMan::ConfigFN = 0;
59 
60 XrdSysMutex XrdCmsClientMan::manMutex;
61 
62 /******************************************************************************/
63 /* C o n s t r u c t o r */
64 /******************************************************************************/
65 
66 XrdCmsClientMan::XrdCmsClientMan(char *host, int port,
67  int cw, int nr, int rw, int rd)
68  : syncResp(0)
69 {
70  static XrdSysMutex initMutex;
71  static int Instance = 0;
72  char *dot;
73 
74  Host = strdup(host);
75  if ((dot = index(Host, '.')))
76  {*dot = '\0'; HPfx = strdup(Host); *dot = '.';}
77  else HPfx = strdup(Host);
78  Port = port;
79  Link = 0;
80  Active = 0;
81  Silent = 0;
82  Suspend = 1;
83  RecvCnt = 0;
84  nrMax = nr;
85  NetBuff = BuffPool.Alloc(XrdOucEI::Max_Error_Len);
86  repWMax = rw;
87  repWait = 0;
88  minDelay= rd;
89  maxDelay= rd*3;
90  chkCount= chkVal;
91  lastUpdt= lastTOut = time(0);
92  Next = 0;
93  manInst = 1;
94 
95 // Compute dally value
96 //
97  dally = cw / 2 - 1;
98  if (dally < 3) dally = 3;
99  else if (dally > 10) dally = 10;
100 
101 // Provide a unique mask number for this manager
102 //
103  initMutex.Lock();
104  manMask = 1<<Instance++;
105  initMutex.UnLock();
106 }
107 
108 /******************************************************************************/
109 /* D e s t r u c t o r */
110 /******************************************************************************/
111 
113 {
114  if (Link) Link->Close();
115  if (Host) free(Host);
116  if (HPfx) free(HPfx);
117  if (NetBuff) NetBuff->Recycle();
118 }
119 
120 /******************************************************************************/
121 /* d e l a y R e s p */
122 /******************************************************************************/
123 
125 {
126  XrdCmsResp *rp;
127  int msgid;
128 
129 // Obtain the message ID
130 //
131  if (!(msgid = Resp.getErrInfo()))
132  {Say.Emsg("Manager", Host, "supplied invalid waitr msgid");
133  Resp.setErrInfo(EILSEQ, "redirector protocol error");
134  syncResp.Post();
135  return SFS_ERROR;
136  }
137 
138 // Allocate a delayed response object
139 //
140  if (!(rp = XrdCmsResp::Alloc(&Resp, msgid)))
141  {Say.Emsg("Manager",ENOMEM,"allocate resp object for",Resp.getErrUser());
142  Resp.setErrInfo(0, "0");
143  syncResp.Post();
144  return SFS_STALL;
145  }
146 
147 // Add this object to our delayed response queue. If the manager bounced then
148 // purge all of the pending repsonses to avoid sending wrong ones.
149 //
150  if (msgid < maxMsgID) RespQ.Purge();
151  maxMsgID = msgid;
152  RespQ.Add(rp);
153 
154 // Tell client to wait for response. The semaphore post allows the manager
155 // to get the next message from the cmsd. This prevents us from getting the
156 // delayed response before the response object is added to the queue.
157 //
158  Resp.setErrInfo(0, "");
159  syncResp.Post();
160  return SFS_STARTED;
161 }
162 
163 /******************************************************************************/
164 /* S e n d */
165 /******************************************************************************/
166 
167 int XrdCmsClientMan::Send(unsigned int &iMan, char *msg, int mlen)
168 {
169  int allok = 0;
170 
171 // Determine message length
172 //
173  if (!mlen) mlen = strlen(msg);
174 
175 // Send the request
176 //
177  myData.Lock();
178  iMan = manInst;
179  if (Active)
180  {if (Link)
181  {if (!(allok = Link->Send(msg, mlen) > 0))
182  {Active = 0;
183  Link->Close(1);
184  manInst++;
185  } else SendCnt++;
186  }
187  }
188  myData.UnLock();
189 
190 // All done
191 //
192  return allok;
193 }
194 
195 /******************************************************************************/
196 
197 int XrdCmsClientMan::Send(unsigned int &iMan, const struct iovec *iov, int iovcnt, int iotot)
198 {
199  int allok = 0;
200 
201 // Send the request
202 //
203  myData.Lock();
204  iMan = manInst;
205  if (Active)
206  {if (Link)
207  {if (!(allok = Link->Send(iov, iovcnt, iotot) > 0))
208  {Active = 0;
209  Link->Close(1);
210  manInst++;
211  } else SendCnt++;
212  }
213  }
214  myData.UnLock();
215 
216 // All done
217 //
218  return allok;
219 }
220 
221 /******************************************************************************/
222 /* S t a r t */
223 /******************************************************************************/
224 
226 {
227 
228 // First step is to connect to the manager
229 //
230  do {Hookup();
231  // Now simply start receiving messages on the stream. When we get a
232  // respwait reply then we must be assured that the object representing
233  // the request is added to the queue before the actual reply arrives.
234  // We do this by waiting on syncResp which is posted once the request
235  // object is fully processed. The actual response associated with the
236  // respwait is synchronized during the callback phase since the client
237  // must receive the respwait before the subsequent response.
238  //
239  while(Receive())
240  if (Response.modifier & CmsResponse::kYR_async) relayResp();
241  else if (Response.rrCode == kYR_status) setStatus();
242  else if (XrdCmsClientMsg::Reply(HPfx, Response, NetBuff))
243  {if (Response.rrCode == kYR_waitresp) syncResp.Wait();}
244 
245  // Tear down the connection
246  //
247  myData.Lock();
248  if (Link) {Link->Close(); Link = 0;}
249  Active = 0; Suspend = 1;
250  myData.UnLock();
251 
252  // Indicate the problem
253  //
254  Say.Emsg("ClientMan", "Disconnected from", Host);
255  XrdSysTimer::Snooze(dally);
256  } while(1);
257 
258 // We should never get here
259 //
260  return (void *)0;
261 }
262 
263 /******************************************************************************/
264 /* w h a t s U p */
265 /******************************************************************************/
266 
267 int XrdCmsClientMan::whatsUp(const char *user, const char *path,
268  unsigned int iMan)
269 {
270  EPNAME("whatsUp");
271  unsigned int xMan;
272  int theDelay, inQ;
273  bool lClose = false;
274 
275 // The cmsd did not respond. Increase silent count and see if restart is needed
276 // Otherwise, increase the wait interval just in case things are just slow.
277 //
278  myData.Lock();
279  if (Active)
280  {if (Active == RecvCnt)
281  {if ((time(0)-lastTOut) >= repWait)
282  {Silent++;
283  if (Silent > nrMax)
284  {Active = 0; Silent = 0; Suspend = 1;
285  if (Link && iMan == manInst)
286  {Link->Close(1);
287  manInst++; lClose = true;
288  }
289  } else if (Silent & 0x02 && repWait < repWMax) repWait++;
290  }
291  } else {Active = RecvCnt; Silent = 0; lastTOut = time(0);}
292  }
293 
294 // Calclulate how long to delay the client. This will be based on the number
295 // of outstanding requests bounded by the config delay value.
296 //
297  inQ = XrdCmsClientMsg::inQ();
298  theDelay = inQ * qTime;
299  xMan = manInst;
300  myData.UnLock();
301  theDelay = theDelay/1000 + (theDelay % 1000 ? 1 : 0);
302  if (theDelay < minDelay) theDelay = minDelay;
303  if (theDelay > maxDelay) theDelay = maxDelay;
304 
305 // Do Some tracing here
306 //
307  TRACE(Redirect, user <<" no resp from inst " <<iMan <<" of "<<HPfx
308  <<" in " <<repWait
309  <<" inst " <<xMan <<(lClose ? " closed" : " steady")
310  <<"; inQ " <<inQ <<" delay " <<theDelay <<" path=" <<path);
311  return theDelay;
312 }
313 
314 /******************************************************************************/
315 /* P r i v a t e M e t h o d s */
316 /******************************************************************************/
317 /******************************************************************************/
318 /* H o o k u p */
319 /******************************************************************************/
320 
321 int XrdCmsClientMan::Hookup()
322 {
323  EPNAME("Hookup");
324  CmsLoginData Data;
325  XrdLink *lp;
326  char buff[256], hnBuff[264];
327  kXR_char *envData = 0;
328  int rc, oldWait, tries = 12, opts = 0;
329 
330 // Turn off our debugging and version flags
331 //
332  manMutex.Lock();
333  doDebug &= ~manMask;
334  manMutex.UnLock();
335 
336 // Report our hostname (there are better ways of doing this)
337 //
338  const char *hn = getenv("XRDHOST");
339  if (hn)
340  {snprintf(hnBuff, sizeof(hnBuff), "myHN=%s", hn);
341  envData = (kXR_char *)hnBuff;
342  }
343 
344 // Keep trying to connect to the manager. Note that we bind the link to this
345 // thread to make sure we get notified should another thread close the socket.
346 //
347  do {while(!(lp = Network->Connect(Host, Port, opts)))
348  {XrdSysTimer::Snooze(dally);
349  if (tries--) opts = XRDNET_NOEMSG;
350  else {opts = 0; tries = 12;}
351  continue;
352  }
353 // lp->Bind(XrdSysThread::ID());
354  memset(&Data, 0, sizeof(Data));
355  Data.envCGI = envData;
356  Data.Mode = CmsLoginData::kYR_director;
357  Data.HoldTime = static_cast<int>(getpid());
358  if (!(rc = XrdCmsLogin::Login(lp, Data))) break;
359  lp->Close();
360  XrdSysTimer::Snooze(dally);
361  } while(1);
362 
363 // Establish global state
364 //
365  manMutex.Lock();
366  doDebug |= (Data.Mode & CmsLoginData::kYR_debug ? manMask : 0);
367  manMutex.UnLock();
368 
369 // All went well, finally
370 //
371  myData.Lock();
372  Link = lp;
373  Active = 1;
374  Silent = 0;
375  RecvCnt = 1;
376  SendCnt = 1;
377  Suspend = (Data.Mode & CmsLoginData::kYR_suspend);
378 
379 // Calculate how long we will wait for replies before delaying the client.
380 // This is computed dynamically based on the expected response window.
381 //
382  if ((oldWait = (repWait*20/100)) < 2) oldWait = 2;
383  if (Data.HoldTime > repWMax*1000) repWait = repWMax;
384  else if (Data.HoldTime <= 0) repWait = repWMax;
385  else {repWait = Data.HoldTime*3;
386  repWait = (repWait/1000) + (repWait % 1000 ? 1 : 0);
387  if (repWait > repWMax) repWait = repWMax;
388  else if (repWait < oldWait) repWait = oldWait;
389  }
390  qTime = (Data.HoldTime < 100 ? 100 : Data.HoldTime);
391  lastTOut = time(0);
392  myData.UnLock();
393 
394 // Tell the world
395 //
396  sprintf(buff, "v %d", Data.Version);
397  Say.Emsg("ClientMan", (Suspend ? "Connected to suspended" : "Connected to"),
398  Host, buff);
399  DEBUG(Host <<" qt=" <<qTime <<"ms rw=" <<repWait);
400  return 1;
401 }
402 
403 /******************************************************************************/
404 /* R e c e i v e */
405 /******************************************************************************/
406 
407 int XrdCmsClientMan::Receive()
408 {
409 // This method is always run out of the object's main thread. Other threads
410 // may call methods that initiate a link reset via a deferred close. We will
411 // notice that here because the file descriptor will be closed. This will
412 // cause us to return an error and precipitate a connection teardown.
413 //
414  EPNAME("Receive")
415  if (Link->RecvAll((char *)&Response, sizeof(Response)) > 0)
416  {int dlen = static_cast<int>(ntohs(Response.datalen));
417  RecvCnt++;
418  DEBUG(Link->Name() <<' ' <<dlen <<" bytes on " <<Response.streamid);
419  if (!dlen) return 1;
420  if ((dlen > NetBuff->BuffSize())
421  && (Response.rrCode != kYR_data || !NetBuff->Resize(dlen)))
422  Say.Emsg("ClientMan", "Excessive msg length from", Host);
423  else {NetBuff->SetLen(dlen);
424  return Link->RecvAll(NetBuff->Buffer(), dlen);
425  }
426  }
427  return 0;
428 }
429 
430 /******************************************************************************/
431 /* r e l a y R e s p */
432 /******************************************************************************/
433 
434 void XrdCmsClientMan::relayResp()
435 {
436  EPNAME("relayResp");
437  XrdCmsResp *rp;
438 
439 // Remove the response object from our queue.
440 //
441  if (!(rp = RespQ.Rem(Response.streamid)))
442  {DEBUG(Host <<" replied to non-existent request; id=" <<Response.streamid);
443  return;
444  }
445 
446 // Queue the request for reply (this transfers the network buffer)
447 //
448  rp->Reply(HPfx, Response, NetBuff);
449 
450 // Obtain a new network buffer
451 //
452  NetBuff = BuffPool.Alloc(XrdOucEI::Max_Error_Len);
453 }
454 
455 /******************************************************************************/
456 /* Private: c h k S t a t u s */
457 /******************************************************************************/
458 
459 int XrdCmsClientMan::chkStatus()
460 {
461  static CmsUpdateRequest Updt = {{0, kYR_update, 0, 0}};
462  XrdSysMutexHelper mdMon(myData);
463  time_t nowTime;
464 
465 // Count down the query count and ask again every 30 seconds
466 //
467  if (!chkCount--)
468  {chkCount = chkVal;
469  nowTime = time(0);
470  if ((nowTime - lastUpdt) >= 30)
471  {lastUpdt = nowTime;
472  if (Active) Link->Send((char *)&Updt, sizeof(Updt));
473  }
474  }
475  return Suspend;
476 }
477 
478 /******************************************************************************/
479 /* s e t S t a t u s */
480 /******************************************************************************/
481 
482 void XrdCmsClientMan::setStatus()
483 {
484  EPNAME("setStatus");
485  const char *State = 0, *Event = "?";
486 
487 
488  myData.Lock();
489  if (Response.modifier & CmsStatusRequest::kYR_Suspend)
490  {Event = "suspend";
491  if (!Suspend) {Suspend = 1; State = "suspended";}
492  }
493  else if (Response.modifier & CmsStatusRequest::kYR_Resume)
494  {Event = "resume";
495  if (Suspend) {Suspend = 0; State = "resumed";}
496  }
497  myData.UnLock();
498 
499  DEBUG(Host <<" sent " <<Event <<" event");
500  if (State) Say.Emsg("setStatus", "Manager", Host, State);
501 }
unsigned char kXR_char
Definition: XPtypes.hh:65
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define EPNAME(x)
Definition: XrdBwmTrace.hh:56
#define XRDNET_NOEMSG
Definition: XrdNetOpts.hh:71
struct myOpts opts
#define SFS_ERROR
#define SFS_STALL
#define SFS_STARTED
if(Avsz)
#define TRACE(act, x)
Definition: XrdTrace.hh:63
int Send(unsigned int &iMan, char *msg, int mlen=0)
int delayResp(XrdOucErrInfo &Resp)
int whatsUp(const char *user, const char *path, unsigned int iMan)
static char doDebug
XrdCmsClientMan(char *host, int port, int cw, int nr, int rw, int rd)
static int inQ()
static int Reply(const char *Man, XrdCms::CmsRRHdr &hdr, XrdOucBuffer *buff)
static int Login(XrdLink *Link, XrdCms::CmsLoginData &Data, int timeout=-1)
Definition: XrdCmsLogin.cc:125
XrdCmsResp * Rem(int msgid)
Definition: XrdCmsResp.cc:282
void Add(XrdCmsResp *rp)
Definition: XrdCmsResp.cc:250
void Purge()
Definition: XrdCmsResp.cc:267
void Reply(const char *Man, XrdCms::CmsRRHdr &rrhdr, XrdOucBuffer *netbuff)
Definition: XrdCmsResp.cc:138
static XrdCmsResp * Alloc(XrdOucErrInfo *erp, int msgid)
Definition: XrdCmsResp.cc:64
XrdLink * Connect(const char *host, int port, int opts=0, int timeout=-1)
Definition: XrdInet.cc:185
XrdOucBuffer * Alloc(int sz)
bool Resize(int newsz)
int BuffSize() const
char * Buffer() const
void Recycle()
Recycle the buffer. The buffer may be reused in the future.
void SetLen(int dataL, int dataO=0)
int setErrInfo(int code, const char *emsg)
const char * getErrUser()
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
static void Snooze(int seconds)
Definition: XrdSysTimer.cc:168
kXR_char modifier
Definition: YProtocol.hh:85
@ kYR_data
Definition: YProtocol.hh:141
@ kYR_waitresp
Definition: YProtocol.hh:145
XrdSysError Say
kXR_char rrCode
Definition: YProtocol.hh:84
@ kYR_update
Definition: YProtocol.hh:115
@ kYR_status
Definition: YProtocol.hh:112
static const size_t Max_Error_Len