1  ''' 
  2  Google Storage based File Transfer Implementation 
  3  ''' 
  4  import urlparse 
  5   
  6  import errno 
  7  import sys 
  8  import os 
  9  import stat 
 10  import logging 
 11  import traceback 
 12  import time 
 13   
 14  sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) 
 15  from pilot.api import State 
 16  from bigjob import logger 
 17   
 18  from apiclient.discovery import build 
 19  from apiclient.http import MediaFileUpload 
 20  from oauth2client.file import Storage 
 21  from oauth2client.client import OAuth2WebServerFlow 
 22  from oauth2client.client import Credentials 
 23  from oauth2client.tools import run 
 24  import httplib2 
 25  import urllib 
 26   
 27  """ 
 28  AN OAUTH2 Client Id must be created at the Google API console at: 
 29   
 30  https://code.google.com/apis/console/ 
 31   
 32  => API Access 
 33   
 34  More information with respect to OAUTH: https://developers.google.com/compute/docs/api/how-tos/authorization 
 35  """ 
 36  OAUTH2_CLIENT_ID='1004462711324-55akehip32m59u6omdfrt9s8u8ehb0hm.apps.googleusercontent.com' 
 37  OAUTH2_CLIENT_SECRET='EIMML1W7anu0XijVghws0DY-' 
 38   
 39  GS_PROJECT_ID="1004462711324" 
 40   
 41   
 43      """ BigData File Management for Pilot Data """ 
 44      
 45       
 46 -    def __init__(self, service_url, security_context=None):         
  47           
 48          self.service_url = service_url 
 49          self.bucket_name = self.__get_bucket_name(service_url) 
 50          self.__state=State.New 
 51                   
 52           
 53          if security_context!=None: 
 54              logger.debug("Attempt to restore credentials from security context: " + str(security_context)) 
 55              self.credentials = Credentials.new_from_json(security_context) 
 56          else: 
 57              storage = Storage('gce.dat') 
 58              self.credentials = storage.get() 
 59          if self.credentials is None or self.credentials.invalid == True: 
 60              logger.debug("No valid credential found. Run new OAuth authentication round...") 
 61              flow = OAuth2WebServerFlow( 
 62                                         client_id=OAUTH2_CLIENT_ID, 
 63                                         client_secret=OAUTH2_CLIENT_SECRET, 
 64                                         scope=['https://www.googleapis.com/auth/devstorage.full_control',  
 65                                                'https://www.googleapis.com/auth/compute'], 
 66                                         user_agent='bigjob-client/1.0') 
 67   
 68              self.credentials = run(flow, storage) 
  69   
 70       
 72          """ Returns security context that needs to be available on the distributed 
 73              node in order to access this Pilot Data """ 
 74          return self.credentials.to_json() 
  75       
 76           
 78           
 79          self.__state=State.Running 
 80          request_dict = { 
 81                          "id":self.bucket_name, 
 82                          "projectId":GS_PROJECT_ID 
 83          }        
 84          logger.debug(str(request_dict))  
 85          try: 
 86              gs = self.__get_api_client()[0] 
 87              gs.buckets().insert(body=request_dict).execute() 
 88          except: 
 89              logger.debug("Error creating bucket: " + self.bucket_name) 
 90              pass  
  91                   
 92           
 96       
 97       
100       
101           
104           
105               
107          gs = self.__get_api_client()[0] 
108          o = gs.objects().insert(bucket=self.bucket_name, name=str(du_id)+"/du_info", 
109                                  body={'media': { 
110                                                  "contentType":"text/ascii", 
111                                                  "data": du_id     
112                                                  }  
113                                        }                                 
114                                  ).execute() 
115          logger.debug("Created GS: " + str(o)) 
 116           
117                   
119          logger.debug("Copy DU to Google Storage") 
120          du_items = du.list() 
121          for i in du_items.keys():    
122              try: 
123                  local_filename=du_items[i]["local"] 
124                  remote_path = os.path.join(str(du.id), os.path.basename(local_filename)) 
125                  self._put_file(local_filename, remote_path) 
126              except: 
127                  logger.debug("Could not copy file: " + (str(i))) 
 128               
129       
132           
133       
134 -    def get_du(self, du, target_url): 
 135          du_id=du.id 
136          logger.debug("Get DU: " + str(du_id)) 
137          gs = self.__get_api_client()[0] 
138          result = gs.objects().list(bucket=self.bucket_name, prefix=du_id).execute()  
139                               
140                               
141          logger.debug("Result: " + str(result)) 
142          for i in result["items"]: 
143              full_filename = i["name"] 
144              self._get_file(full_filename, os.path.join(target_url, os.path.basename(full_filename))) 
 145           
146                   
148          self.__remove_directory(os.path.join(self.bucket_name, du.id)) 
 149       
150       
151       
152       
154          logger.debug("Put file: %s to %s"%(source, target)) 
155          gs = self.__get_api_client()[0] 
156           
157           
158          o = gs.objects().insert(bucket=self.bucket_name,  
159                                  name=target, 
160                                  media_body=source).execute()             
161          logger.debug("Put file result: %s"%str(o)) 
 162       
163       
165          logger.debug("GET file: %s to %s"%(source, target)) 
166          gs, http = self.__get_api_client() 
167          f = gs.objects().get(bucket=self.bucket_name,  
168                               object=source).execute()             
169          logger.debug("Get file result: %s"%str(f)) 
170          downloadUrl = f["media"]['link'] 
171          if downloadUrl: 
172              response, content = http.request(downloadUrl) 
173              logger.debug("Download file response: %d"%(response.status)) 
174              with open(target, 'wb') as f: 
175                  f.write(content) 
 176           
177       
178 -    def transfer(self, source_url, target_url): 
 180       
183       
184       
186          result = urlparse.urlparse(target_url) 
187          target_path = result.path 
188          return target_path 
 189           
190           
191       
192       
194          http = httplib2.Http() 
195          http = self.credentials.authorize(http) 
196          gs = build("storage", "v1beta1", http=http) 
197          return gs, http 
 198       
199       
201          bucket_name = service_url.replace("gs://", "") 
202          bucket_name = bucket_name.replace("/", "") 
203          return bucket_name 
 204           
205      
207          exc_type, exc_value, exc_traceback = sys.exc_info() 
208          print "*** print_tb:" 
209          traceback.print_tb(exc_traceback, limit=1, file=sys.stdout) 
210          print "*** print_exception:" 
211          traceback.print_exception(exc_type, exc_value, exc_traceback, 
212                                limit=2, file=sys.stdout) 
 213       
214       
215  if __name__ == "__main__": 
216      gs = GSFileAdaptor("gs://pilot-data-bucket-1234") 
217      gs.initialize_pilotdata() 
218      gs._put_file("test-random.exe", "test.exe") 
219      gs._get_file("test.txt", "test2.txt") 
220      gs.get_du(None, ".") 
221