1  ''' 
  2  SSH-based coordination scheme between manager and agent 
  3  ''' 
  4  import urlparse 
  5  import pdb 
  6  import glob 
  7  import errno 
  8  import sys 
  9  import os 
 10  import stat 
 11  import logging 
 12  import traceback 
 13  import pexpect 
 14   
 15  sys.path.append(os.path.join(os.path.dirname(__file__), "../..")) 
 16  from pilot.api import State 
 17  from bigjob import logger 
 18   
 19  SSH_OPTS="-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o NumberOfPasswordPrompts=0" 
 20   
 21   
 23      """ BigData Coordination File Management for Pilot Store """ 
 24       
 25 -    def __init__(self, service_url, security_context=None, pilot_data_description=None):         
  26          self.service_url = service_url 
 27          result = urlparse.urlparse(service_url) 
 28          self.host = result.hostname 
 29          self.path = result.path         
 30          self.user = result.username 
 31           
 32          self.pilot_data_description=pilot_data_description 
 33           
 34           
 35          self.userkey=None 
 36          self.security_context=security_context 
 37            
 38           
 39          if self.pilot_data_description!=None and\ 
 40             self.pilot_data_description.has_key("userkey"): 
 41              self.userkey=self.pilot_data_description["userkey"] 
 42   
 43          logger.debug("Security Context: " + str(self.security_context)) 
 44               
 45           
 46          if security_context!=None and security_context!="None": 
 47              logger.debug("Attempt to restore SSH credentials from security context: " + str(security_context)) 
 48              security_context = eval(security_context) 
 49              key=security_context["userkey"]              
 50              self.userkey=os.path.join(os.getcwd(), ".ssh/id_rsa") 
 51              if os.path.exists(os.path.join(os.getcwd(),".ssh"))==False: 
 52                  os.makedirs(os.path.join(os.getcwd(), ".ssh")) 
 53              logger.debug("Write key: " + str(type(key)) + " to: " + self.userkey) 
 54              try: 
 55                  f = open(self.userkey, "w")  
 56                  for i in key: 
 57                      logger.debug("write: " + str(i)) 
 58                      f.write(i) 
 59                  f.close() 
 60                  os.chmod(self.userkey, 0600) 
 61              except: 
 62                  self.__print_traceback() 
  63           
 64   
 66          """ Returns security context that needs to be available on the distributed 
 67              node in order to access this Pilot Data """ 
 68          if (self.security_context==None or self.security_context=="None") and self.pilot_data_description.has_key("userkey"): 
 69              f = open(self.pilot_data_description["userkey"]) 
 70              key = f.readlines() 
 71              f.close 
 72              self.security_context = {"userkey":key} 
 73          logger.debug("Return security context: " + str(self.security_context))         
 74          return self.security_context 
  75           
 76           
 78           
 79          try: 
 80              command = "mkdir -p %s"%self.path  
 81              self.__run_ssh_command(self.userkey, self.user, self.host, command)       
 82          except IOError: 
 83              self.__print_traceback() 
 84               
 85          self.__state=State.Running 
  86           
 87           
 90       
 91       
 93          self.__remove_directory(self.path) 
 94          self.__state=State.Done 
  95       
 96           
 98          if self.__client.get_transport().is_active()==True: 
 99              return self.__state 
100          else: 
101              self.__state=State.Failed 
102              return self.__state             
 103               
105          du_dir = os.path.join(self.path, str(du_id)) 
106          logger.debug("/bin/date") 
107          command = "/bin/date" 
108          self.__run_ssh_command(self.userkey, self.user, self.host, command) 
109          logger.debug("mkdir: " + du_dir) 
110          command = "mkdir %s"%du_dir 
111          self.__run_ssh_command(self.userkey, self.user, self.host, command) 
 112           
113           
116                   
117   
119          logger.debug("Copy DU using SCP") 
120          du_items = du.list() 
121          for i in du_items.keys():      
122              local_filename = du_items[i]["local"] 
123              remote_path = os.path.join(self.path, str(du.id), os.path.basename(local_filename)) 
124              logger.debug("Put file: %s to %s"%(i, remote_path))                         
125              if local_filename.startswith("ssh://"): 
126                   
127                  if self.__is_remote_directory(local_filename): 
128                      logger.warning("Path %s is a directory. Ignored."%local_filename)                 
129                      continue 
130                  
131                   
132              else: 
133                  try: 
134                      if stat.S_ISDIR(os.stat(local_filename).st_mode): 
135                          logger.warning("Path %s is a directory. Ignored."%local_filename)                 
136                          continue 
137                  except: 
138                      pass          
139              result = urlparse.urlparse(local_filename) 
140              source_host = result.netloc 
141              source_path = result.path 
142              source_user = result.username 
143              logger.debug(str((source_host, source_path, self.host, remote_path))) 
144              self.__run_scp_command(self.userkey, source_user, source_host, source_path, self.user, self.host, remote_path)         
 145       
146     
151           
152       
153 -    def get_du(self, du, target_url): 
 154          remote_url = target_url 
155          local_url =  self.service_url  + "/" + str(du.id) 
156          logger.debug("get_du(): copy %s to %s:"%(local_url, remote_url)) 
157          self.copy_du_to_url(du, local_url, remote_url)   
 158           
159           
161          self.__remove_directory(os.path.join(self.path, du.id)) 
 162       
163           
165          logger.debug("Bytes transfered %d/%d"%(transfered_bytes, total_bytes)) 
 166       
167           
168       
169       
170       
171       
172 -    def transfer(self, source_url, target_url): 
 173          self.__third_party_transfer_scp(source_url, target_url)     
 174       
175       
177          result = urlparse.urlparse(target_url) 
178          target_host = result.hostname 
179          target_path = result.path 
180          target_user = result.username 
181          logger.debug("Create directory: %s"%target_path) 
182          command = "mkdir %s"%target_path 
183          rc = self.__run_ssh_command(self.userkey, target_user, target_host, command) 
184          if rc==0: 
185              return True 
186          else: 
187              return False 
 188                   
189           
191          result = urlparse.urlparse(target_url) 
192          return result.path 
 193       
194           
198     
199               
200       
201       
203          return os.path.join(self.path, str(du.id)) 
 204       
205       
207          """Remove remote directory that may contain files.         
208          """ 
209          if self.__exists(path): 
210              command = "rm -rf %s"%path 
211              rc = self.__run_ssh_command(self.userkey, self.user, self.host, command) 
212              if rc==0: 
213                  return True 
214              else: 
215                  return False 
 216               
217           
219          result = urlparse.urlparse(url) 
220          host = result.hostname 
221          path = result.path 
222          user = result.username 
223           
224          command = "test -d %s"%path 
225          rc = self.__run_ssh_command(self.userkey, user, host, command) 
226          if rc==0: 
227              logger.debug("Directory found: %s"%path) 
228              return True 
229          else: 
230              logger.debug("Directory not found: %s"%path) 
231              return False 
 232               
233   
235          result = urlparse.urlparse(source_url) 
236          source_host = result.hostname 
237          source_path = result.path 
238          source_user = result.username 
239          if source_host==None or source_host=="": 
240              source_host="localhost" 
241   
242          result = urlparse.urlparse(target_url) 
243          target_host = result.netloc 
244          target_path = result.path 
245          target_user = result.username 
246          if target_host==None or target_host=="": 
247              target_host="localhost" 
248   
249           
250          if os.path.exists(os.path.dirname(source_path)): 
251              logger.debug("Target and source host are localhost. Processing: %s" %(source_path)) 
252              expanded_path = glob.glob(source_path) 
253              logger.debug("Expanded path: " + str(expanded_path)) 
254              for path in expanded_path:  
255                  if os.path.isdir(path): 
256                      logger.debug("Source path %s is directory"%path) 
257                      files = os.listdir(path) 
258                      for i in files: 
259                          try: 
260                              os.symlink(os.path.join(files, i), target_path) 
261                          except: 
262                              self.__print_traceback() 
263                  else: 
264                      try: 
265                          os.symlink(path, os.path.join(target_path, os.path.basename(path))) 
266                      except: 
267                          self.__print_traceback() 
268          else: 
269              self.__run_scp_command(self.userkey, source_user, source_host, source_path, target_user, target_host, target_path) 
 270   
271   
272    
274          """Return True if the remote path exists 
275          """ 
276          command = "test -e %s"%path 
277          rc = self.__run_ssh_command(self.userkey, self.user, self.host, command) 
278          if rc==0: 
279              return True 
280          else: 
281              return False 
 282           
283    
284       
286          prefix="" 
287          if host != None: 
288              prefix = "ssh " + SSH_OPTS + " " 
289              if userkey != None: 
290                  prefix = prefix + " -i " + userkey + " " 
291              if user!=None: 
292                  prefix = prefix + " " + user+ "@"  
293              prefix = prefix + host 
294           
295          command = prefix + " " + command 
296          logger.debug(command.strip()) 
297          child = pexpect.spawn(command.strip(), timeout=None) 
298          output = child.readlines() 
299          logger.debug("Run %s Output: %s"%(command, str(output))) 
300          child.close() 
301          return output  
 302       
303   
304   
305 -    def __run_scp_command(self, userkey, source_user, source_host, source_path, target_user, target_host, target_path): 
 306          logger.debug("Create scp command: source_user: %s, source_host: %s"%(source_user, source_host)) 
307          command = "scp " + SSH_OPTS + " " 
308          if userkey != None: 
309              command = command + "-i " + userkey + " " 
310          if source_user!=None: 
311              command = command + " " + source_user + "@"  
312          if source_host != None and source_host!="" and source_host!="localhost": 
313              command = command + source_host + ":" 
314           
315           
316          command = command + source_path + " " 
317           
318          if target_user!=None: 
319              command = command + " " + target_user + "@"  
320          
321          if target_host != None and target_host!="" and target_host!="localhost": 
322              command = command + target_host + ":" 
323               
324          command = command + target_path  
325          logger.debug(command)     
326          child = pexpect.spawn(command.strip(), timeout=None) 
327          output = child.readlines() 
328          logger.debug("Run %s Output: %s"%(command, str(output))) 
329          child.close() 
330          return child.exitstatus 
 331           
332       
333      
335          exc_type, exc_value, exc_traceback = sys.exc_info() 
336          print "*** print_tb:" 
337          traceback.print_tb(exc_traceback, limit=1, file=sys.stderr) 
338          print "*** print_exception:" 
339          traceback.print_exception(exc_type, exc_value, exc_traceback, 
340                                limit=2, file=sys.stderr) 
 341