Package pilot :: Package filemanagement :: Module gs_adaptor

Source Code for Module pilot.filemanagement.gs_adaptor

  1  ''' 
  2  Google Storage based File Transfer Implementation 
  3  ''' 
  4  import urlparse 
  5   
  6  import errno 
  7  import sys 
  8  import os 
  9  import stat 
 10  import logging 
 11  import traceback 
 12  import time 
 13   
 14  sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) 
 15  from pilot.api import State 
 16  from bigjob import logger 
 17   
 18  from apiclient.discovery import build 
 19  from apiclient.http import MediaFileUpload 
 20  from oauth2client.file import Storage 
 21  from oauth2client.client import OAuth2WebServerFlow 
 22  from oauth2client.client import Credentials 
 23  from oauth2client.tools import run 
 24  import httplib2 
 25  import urllib 
 26   
 27  """ 
 28  AN OAUTH2 Client Id must be created at the Google API console at: 
 29   
 30  https://code.google.com/apis/console/ 
 31   
 32  => API Access 
 33   
 34  More information with respect to OAUTH: https://developers.google.com/compute/docs/api/how-tos/authorization 
 35  """ 
 36  OAUTH2_CLIENT_ID='1004462711324-55akehip32m59u6omdfrt9s8u8ehb0hm.apps.googleusercontent.com' 
 37  OAUTH2_CLIENT_SECRET='EIMML1W7anu0XijVghws0DY-' 
 38   
 39  GS_PROJECT_ID="1004462711324" 
 40   
 41   
42 -class GSFileAdaptor(object):
43 """ BigData File Management for Pilot Data """ 44 45
46 - def __init__(self, service_url, security_context=None):
47 # Initializations of instance variables 48 self.service_url = service_url 49 self.bucket_name = self.__get_bucket_name(service_url) 50 self.__state=State.New 51 52 # Do OAUTH authentication 53 if security_context!=None: 54 logger.debug("Attempt to restore credentials from security context: " + str(security_context)) 55 self.credentials = Credentials.new_from_json(security_context) 56 else: 57 storage = Storage('gce.dat') 58 self.credentials = storage.get() 59 if self.credentials is None or self.credentials.invalid == True: 60 logger.debug("No valid credential found. Run new OAuth authentication round...") 61 flow = OAuth2WebServerFlow( 62 client_id=OAUTH2_CLIENT_ID, 63 client_secret=OAUTH2_CLIENT_SECRET, 64 scope=['https://www.googleapis.com/auth/devstorage.full_control', 65 'https://www.googleapis.com/auth/compute'], 66 user_agent='bigjob-client/1.0') 67 68 self.credentials = run(flow, storage)
69 70
71 - def get_security_context(self):
72 """ Returns security context that needs to be available on the distributed 73 node in order to access this Pilot Data """ 74 return self.credentials.to_json()
75 76
77 - def initialize_pilotdata(self):
78 # check whether directory exists 79 self.__state=State.Running 80 request_dict = { 81 "id":self.bucket_name, 82 "projectId":GS_PROJECT_ID 83 } 84 logger.debug(str(request_dict)) 85 try: 86 gs = self.__get_api_client()[0] 87 gs.buckets().insert(body=request_dict).execute() 88 except: 89 logger.debug("Error creating bucket: " + self.bucket_name) 90 pass # Do nothing if bucket already exists
91 92
93 - def get_pilotdata_size(self):
94 # unlimited size 95 return None
96 97
98 - def delete_pilotdata(self):
99 self.__state=State.Done
100 101
102 - def get_state(self):
103 return self.__state
104 105
106 - def create_du(self, du_id):
107 gs = self.__get_api_client()[0] 108 o = gs.objects().insert(bucket=self.bucket_name, name=str(du_id)+"/du_info", 109 body={'media': { 110 "contentType":"text/ascii", 111 "data": du_id 112 } 113 } 114 ).execute() 115 logger.debug("Created GS: " + str(o))
116 117
118 - def put_du(self, du):
119 logger.debug("Copy DU to Google Storage") 120 du_items = du.list() 121 for i in du_items.keys(): 122 try: 123 local_filename=du_items[i]["local"] 124 remote_path = os.path.join(str(du.id), os.path.basename(local_filename)) 125 self._put_file(local_filename, remote_path) 126 except: 127 logger.debug("Could not copy file: " + (str(i)))
128 129
130 - def copy_du(self, du, pd_new):
131 pass
132 133
134 - def get_du(self, du, target_url):
135 du_id=du.id 136 logger.debug("Get DU: " + str(du_id)) 137 gs = self.__get_api_client()[0] 138 result = gs.objects().list(bucket=self.bucket_name, prefix=du_id).execute() 139 #delimiter="/", 140 #prefix=[du_id]).execute() 141 logger.debug("Result: " + str(result)) 142 for i in result["items"]: 143 full_filename = i["name"] 144 self._get_file(full_filename, os.path.join(target_url, os.path.basename(full_filename)))
145 146
147 - def remove_du(self, du):
148 self.__remove_directory(os.path.join(self.bucket_name, du.id))
149 150 151 ########################################################################### 152 # Pure File Management APIs
153 - def _put_file(self, source, target):
154 logger.debug("Put file: %s to %s"%(source, target)) 155 gs = self.__get_api_client()[0] 156 #media = MediaFileUpload(source, 157 # resumable=False) 158 o = gs.objects().insert(bucket=self.bucket_name, 159 name=target, 160 media_body=source).execute() 161 logger.debug("Put file result: %s"%str(o))
162 163
164 - def _get_file(self, source, target):
165 logger.debug("GET file: %s to %s"%(source, target)) 166 gs, http = self.__get_api_client() 167 f = gs.objects().get(bucket=self.bucket_name, 168 object=source).execute() 169 logger.debug("Get file result: %s"%str(f)) 170 downloadUrl = f["media"]['link'] 171 if downloadUrl: 172 response, content = http.request(downloadUrl) 173 logger.debug("Download file response: %d"%(response.status)) 174 with open(target, 'wb') as f: 175 f.write(content)
176 177
178 - def transfer(self, source_url, target_url):
179 pass
180
181 - def create_remote_directory(self, target_url):
182 return True
183 184
185 - def get_path(self, target_url):
186 result = urlparse.urlparse(target_url) 187 target_path = result.path 188 return target_path
189 190 191 ########################################################################### 192 # Auxiliary functions
193 - def __get_api_client(self):
194 http = httplib2.Http() 195 http = self.credentials.authorize(http) 196 gs = build("storage", "v1beta1", http=http) 197 return gs, http
198 199
200 - def __get_bucket_name(self, service_url):
201 bucket_name = service_url.replace("gs://", "") 202 bucket_name = bucket_name.replace("/", "") 203 return bucket_name
204 205
206 - def __print_traceback(self):
207 exc_type, exc_value, exc_traceback = sys.exc_info() 208 print "*** print_tb:" 209 traceback.print_tb(exc_traceback, limit=1, file=sys.stdout) 210 print "*** print_exception:" 211 traceback.print_exception(exc_type, exc_value, exc_traceback, 212 limit=2, file=sys.stdout)
213 214 215 if __name__ == "__main__": 216 gs = GSFileAdaptor("gs://pilot-data-bucket-1234") 217 gs.initialize_pilotdata() 218 gs._put_file("test-random.exe", "test.exe") 219 gs._get_file("test.txt", "test2.txt") 220 gs.get_du(None, ".") 221