  1  ''' 
  2  SSH-based coordination scheme between manager and agent 
  3  ''' 
  4  import urlparse 
  5  import pdb 
  6  import glob 
  7  import errno 
  8  import sys 
  9  import os 
 10  import stat 
 11  import logging 
 12  import traceback 
 13  import pexpect 
 15  sys.path.append(os.path.join(os.path.dirname(__file__), "../..")) 
 16  from pilot.api import State 
 17  from bigjob import logger 
 19  SSH_OPTS="-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o NumberOfPasswordPrompts=0" 
22 -class SSHFileAdaptor(object):
23 """ BigData Coordination File Management for Pilot Store """ 24
25 - def __init__(self, service_url, security_context=None, pilot_data_description=None):
26 self.service_url = service_url 27 result = urlparse.urlparse(service_url) 28 = result.hostname 29 self.path = result.path 30 self.user = result.username 31 32 self.pilot_data_description=pilot_data_description 33 34 # handle security context 35 self.userkey=None 36 self.security_context=security_context 37 38 # try to recover key from pilot_data_description 39 if self.pilot_data_description!=None and\ 40 self.pilot_data_description.has_key("userkey"): 41 self.userkey=self.pilot_data_description["userkey"] 42 43 logger.debug("Security Context: " + str(self.security_context)) 44 45 # try to recover key from security context 46 if security_context!=None and security_context!="None": 47 logger.debug("Attempt to restore SSH credentials from security context: " + str(security_context)) 48 security_context = eval(security_context) 49 key=security_context["userkey"] 50 self.userkey=os.path.join(os.getcwd(), ".ssh/id_rsa") 51 if os.path.exists(os.path.join(os.getcwd(),".ssh"))==False: 52 os.makedirs(os.path.join(os.getcwd(), ".ssh")) 53 logger.debug("Write key: " + str(type(key)) + " to: " + self.userkey) 54 try: 55 f = open(self.userkey, "w") 56 for i in key: 57 logger.debug("write: " + str(i)) 58 f.write(i) 59 f.close() 60 os.chmod(self.userkey, 0600) 61 except: 62 self.__print_traceback()
63 64
65 - def get_security_context(self):
66 """ Returns security context that needs to be available on the distributed 67 node in order to access this Pilot Data """ 68 if (self.security_context==None or self.security_context=="None") and self.pilot_data_description.has_key("userkey"): 69 f = open(self.pilot_data_description["userkey"]) 70 key = f.readlines() 71 f.close 72 self.security_context = {"userkey":key} 73 logger.debug("Return security context: " + str(self.security_context)) 74 return self.security_context
75 76
77 - def initialize_pilotdata(self):
78 # check whether directory exists 79 try: 80 command = "mkdir -p %s"%self.path 81 self.__run_ssh_command(self.userkey, self.user,, command) 82 except IOError: 83 self.__print_traceback() 84 # directory does not exist 85 self.__state=State.Running
86 87
88 - def get_pilotdata_size(self):
89 return None
90 91
92 - def delete_pilotdata(self):
93 self.__remove_directory(self.path) 94 self.__state=State.Done
95 96
97 - def get_state(self):
98 if self.__client.get_transport().is_active()==True: 99 return self.__state 100 else: 101 self.__state=State.Failed 102 return self.__state
104 - def create_du(self, du_id):
105 du_dir = os.path.join(self.path, str(du_id)) 106 logger.debug("/bin/date") 107 command = "/bin/date" 108 self.__run_ssh_command(self.userkey, self.user,, command) 109 logger.debug("mkdir: " + du_dir) 110 command = "mkdir %s"%du_dir 111 self.__run_ssh_command(self.userkey, self.user,, command)
112 113
114 - def put_du(self, du):
115 self.put_du_scp(du)
116 117
118 - def put_du_scp(self, du):
119 logger.debug("Copy DU using SCP") 120 du_items = du.list() 121 for i in du_items.keys(): 122 local_filename = du_items[i]["local"] 123 remote_path = os.path.join(self.path, str(, os.path.basename(local_filename)) 124 logger.debug("Put file: %s to %s"%(i, remote_path)) 125 if local_filename.startswith("ssh://"): 126 # check if remote path is directory 127 if self.__is_remote_directory(local_filename): 128 logger.warning("Path %s is a directory. Ignored."%local_filename) 129 continue 130 131 #self.__third_party_transfer(i.local_url, remote_path) 132 else: 133 try: 134 if stat.S_ISDIR(os.stat(local_filename).st_mode): 135 logger.warning("Path %s is a directory. Ignored."%local_filename) 136 continue 137 except: 138 pass 139 result = urlparse.urlparse(local_filename) 140 source_host = result.netloc 141 source_path = result.path 142 source_user = result.username 143 logger.debug(str((source_host, source_path,, remote_path))) 144 self.__run_scp_command(self.userkey, source_user, source_host, source_path, self.user,, remote_path)
145 146
147 - def copy_du(self, du, pd_new):
148 remote_url = pd_new.service_url + "/" + str( 149 local_url = self.service_url + "/" + str( 150 self.copy_du_to_url(du, local_url, remote_url)
151 152
153 - def get_du(self, du, target_url):
154 remote_url = target_url 155 local_url = self.service_url + "/" + str( 156 logger.debug("get_du(): copy %s to %s:"%(local_url, remote_url)) 157 self.copy_du_to_url(du, local_url, remote_url)
158 159
160 - def remove_du(self, du):
161 self.__remove_directory(os.path.join(self.path,
162 163
164 - def put_progress(self, transfered_bytes, total_bytes):
165 logger.debug("Bytes transfered %d/%d"%(transfered_bytes, total_bytes))
166 167 168 169 #################################################################################### 170 # pure file management methods 171 # used by BJ file staging
172 - def transfer(self, source_url, target_url):
173 self.__third_party_transfer_scp(source_url, target_url)
174 175
176 - def create_remote_directory(self, target_url):
177 result = urlparse.urlparse(target_url) 178 target_host = result.hostname 179 target_path = result.path 180 target_user = result.username 181 logger.debug("Create directory: %s"%target_path) 182 command = "mkdir %s"%target_path 183 rc = self.__run_ssh_command(self.userkey, target_user, target_host, command) 184 if rc==0: 185 return True 186 else: 187 return False
188 189
190 - def get_path(self, target_url):
191 result = urlparse.urlparse(target_url) 192 return result.path
193 194
195 - def copy_du_to_url(self, du, local_url, remote_url):
196 self.create_remote_directory(remote_url) 197 self.__third_party_transfer_scp(local_url + "/*", remote_url)
198 199 200 ########################################################################### 201 # Private support methods
202 - def __get_path_for_du(self, du):
203 return os.path.join(self.path, str(
204 205
206 - def __remove_directory(self, path):
207 """Remove remote directory that may contain files. 208 """ 209 if self.__exists(path): 210 command = "rm -rf %s"%path 211 rc = self.__run_ssh_command(self.userkey, self.user,, command) 212 if rc==0: 213 return True 214 else: 215 return False
216 217
218 - def __is_remote_directory(self, url):
219 result = urlparse.urlparse(url) 220 host = result.hostname 221 path = result.path 222 user = result.username 223 224 command = "test -d %s"%path 225 rc = self.__run_ssh_command(self.userkey, user, host, command) 226 if rc==0: 227 logger.debug("Directory found: %s"%path) 228 return True 229 else: 230 logger.debug("Directory not found: %s"%path) 231 return False
232 233
234 - def __third_party_transfer_scp(self, source_url, target_url):
235 result = urlparse.urlparse(source_url) 236 source_host = result.hostname 237 source_path = result.path 238 source_user = result.username 239 if source_host==None or source_host=="": 240 source_host="localhost" 241 242 result = urlparse.urlparse(target_url) 243 target_host = result.netloc 244 target_path = result.path 245 target_user = result.username 246 if target_host==None or target_host=="": 247 target_host="localhost" 248 249 #check whether this is a local transfer 250 if os.path.exists(os.path.dirname(source_path)): 251 logger.debug("Target and source host are localhost. Processing: %s" %(source_path)) 252 expanded_path = glob.glob(source_path) 253 logger.debug("Expanded path: " + str(expanded_path)) 254 for path in expanded_path: 255 if os.path.isdir(path): 256 logger.debug("Source path %s is directory"%path) 257 files = os.listdir(path) 258 for i in files: 259 try: 260 os.symlink(os.path.join(files, i), target_path) 261 except: 262 self.__print_traceback() 263 else: 264 try: 265 os.symlink(path, os.path.join(target_path, os.path.basename(path))) 266 except: 267 self.__print_traceback() 268 else: 269 self.__run_scp_command(self.userkey, source_user, source_host, source_path, target_user, target_host, target_path)
270 271 272
273 - def __exists(self, path):
274 """Return True if the remote path exists 275 """ 276 command = "test -e %s"%path 277 rc = self.__run_ssh_command(self.userkey, self.user,, command) 278 if rc==0: 279 return True 280 else: 281 return False
282 283 284
285 - def __run_ssh_command(self, userkey, user, host, command):
286 prefix="" 287 if host != None: 288 prefix = "ssh " + SSH_OPTS + " " 289 if userkey != None: 290 prefix = prefix + " -i " + userkey + " " 291 if user!=None: 292 prefix = prefix + " " + user+ "@" 293 prefix = prefix + host 294 295 command = prefix + " " + command 296 logger.debug(command.strip()) 297 child = pexpect.spawn(command.strip(), timeout=None) 298 output = child.readlines() 299 logger.debug("Run %s Output: %s"%(command, str(output))) 300 child.close() 301 return output
302 303 304
305 - def __run_scp_command(self, userkey, source_user, source_host, source_path, target_user, target_host, target_path):
306 logger.debug("Create scp command: source_user: %s, source_host: %s"%(source_user, source_host)) 307 command = "scp " + SSH_OPTS + " " 308 if userkey != None: 309 command = command + "-i " + userkey + " " 310 if source_user!=None: 311 command = command + " " + source_user + "@" 312 if source_host != None and source_host!="" and source_host!="localhost": 313 command = command + source_host + ":" 314 315 # path is a must parameter 316 command = command + source_path + " " 317 318 if target_user!=None: 319 command = command + " " + target_user + "@" 320 321 if target_host != None and target_host!="" and target_host!="localhost": 322 command = command + target_host + ":" 323 324 command = command + target_path 325 logger.debug(command) 326 child = pexpect.spawn(command.strip(), timeout=None) 327 output = child.readlines() 328 logger.debug("Run %s Output: %s"%(command, str(output))) 329 child.close() 330 return child.exitstatus
331 332 333
334 - def __print_traceback(self):
335 exc_type, exc_value, exc_traceback = sys.exc_info() 336 print "*** print_tb:" 337 traceback.print_tb(exc_traceback, limit=1, file=sys.stderr) 338 print "*** print_exception:" 339 traceback.print_exception(exc_type, exc_value, exc_traceback, 340 limit=2, file=sys.stderr)