?? hadoop.py
字號:
#Licensed to the Apache Software Foundation (ASF) under one#or more contributor license agreements. See the NOTICE file#distributed with this work for additional information#regarding copyright ownership. The ASF licenses this file#to you under the Apache License, Version 2.0 (the#"License"); you may not use this file except in compliance#with the License. You may obtain a copy of the License at# http://www.apache.org/licenses/LICENSE-2.0#Unless required by applicable law or agreed to in writing, software#distributed under the License is distributed on an "AS IS" BASIS,#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#See the License for the specific language governing permissions and#limitations under the License."""define WorkLoad as abstract interface for user job"""# -*- python -*-import os, time, sys, shutil, exceptions, re, threading, signal, urllib, pprint, mathfrom HTMLParser import HTMLParserimport xml.dom.minidomimport xml.dom.pulldomfrom xml.dom import getDOMImplementationfrom hodlib.Common.util import *from hodlib.Common.xmlrpc import hodXRClientfrom hodlib.Common.miniHTMLParser import miniHTMLParserfrom hodlib.Common.nodepoolutil import NodePoolUtilfrom hodlib.Common.tcp import tcpError, tcpSocketreCommandDelimeterString = r"(?<!\\);"reCommandDelimeter = re.compile(reCommandDelimeterString)class hadoopConfig: def __create_xml_element(self, doc, name, value, description, final = False): prop = doc.createElement("property") nameP = doc.createElement("name") string = doc.createTextNode(name) nameP.appendChild(string) valueP = doc.createElement("value") string = doc.createTextNode(value) valueP.appendChild(string) if final: finalP = doc.createElement("final") string = doc.createTextNode("true") finalP.appendChild(string) desc = doc.createElement("description") string = doc.createTextNode(description) desc.appendChild(string) prop.appendChild(nameP) prop.appendChild(valueP) if final: prop.appendChild(finalP) prop.appendChild(desc) return prop def gen_site_conf(self, confDir, tempDir, numNodes, hdfsAddr, mrSysDir,\ mapredAddr=None, clientParams=None, serverParams=None,\ finalServerParams=None, clusterFactor=None): if not mapredAddr: mapredAddr = "dummy:8181" implementation = getDOMImplementation() doc = implementation.createDocument('', 'configuration', None) comment = doc.createComment( "This is an auto generated hadoop-site.xml, do not modify") topElement = doc.documentElement topElement.appendChild(comment) description = {} paramsDict = { 'mapred.job.tracker' : mapredAddr , \ 'fs.default.name' : "hdfs://" + hdfsAddr, \ 'hadoop.tmp.dir' : tempDir, \ 'dfs.client.buffer.dir' : os.path.join(tempDir, 'dfs', 'tmp'), } paramsDict['mapred.system.dir'] = mrSysDir # mapred-default.xml is no longer used now. numred = int(math.floor(clusterFactor * (int(numNodes) - 1))) paramsDict['mapred.reduce.tasks'] = str(numred) # end # for all the above vars generated, set the description for k, v in paramsDict.iteritems(): description[k] = 'Hod generated parameter' # finalservelParams if finalServerParams: for k, v in finalServerParams.iteritems(): if not description.has_key(k): description[k] = "final server parameter" paramsDict[k] = v # servelParams if serverParams: for k, v in serverParams.iteritems(): if not description.has_key(k): # if no final value for same param is mentioned description[k] = "server parameter" paramsDict[k] = v # clientParams if clientParams: for k, v in clientParams.iteritems(): if not description.has_key(k) or description[k] == "server parameter": # Just add, if no final value for same param is mentioned. # Replace even if server param is mentioned for same config variable description[k] = "client-side parameter" paramsDict[k] = v # generate the xml elements for k,v in paramsDict.iteritems(): if ( description[k] == "final server parameter" or \ description[k] == "Hod generated parameter" ): final = True else: final = False prop = self.__create_xml_element(doc, k, v, description[k], final) topElement.appendChild(prop) siteName = os.path.join(confDir, "hadoop-site.xml") sitefile = file(siteName, 'w') print >> sitefile, topElement.toxml() sitefile.close()class hadoopCluster: def __init__(self, cfg, log): self.__cfg = cfg self.__log = log self.__changedClusterParams = [] self.__hostname = local_fqdn() self.__svcrgyClient = None self.__nodePool = NodePoolUtil.getNodePool(self.__cfg['nodepooldesc'], self.__cfg, self.__log) self.__hadoopCfg = hadoopConfig() self.jobId = None self.mapredInfo = None self.hdfsInfo = None self.ringmasterXRS = None def __get_svcrgy_client(self): svcrgyUrl = to_http_url(self.__cfg['hod']['xrs-address']) return hodXRClient(svcrgyUrl) def __get_service_status(self): serviceData = self.__get_service_data() status = True hdfs = False mapred = False for host in serviceData.keys(): for item in serviceData[host]: service = item.keys() if service[0] == 'hdfs.grid' and \ self.__cfg['gridservice-hdfs']['external'] == False: hdfs = True elif service[0] == 'mapred.grid': mapred = True if not mapred: status = "mapred" if not hdfs and self.__cfg['gridservice-hdfs']['external'] == False: if status != True: status = "mapred and hdfs" else: status = "hdfs" return status def __get_service_data(self): registry = to_http_url(self.__cfg['hod']['xrs-address']) serviceData = self.__svcrgyClient.getServiceInfo( self.__cfg['hod']['userid'], self.__setup.np.getNodePoolId()) return serviceData def __check_job_status(self): initWaitCount = 20 count = 0 status = False state = 'Q' userLimitsFirstFlag = True while state == 'Q': if hodInterrupt.isSet(): raise HodInterruptException() jobInfo = self.__nodePool.getJobInfo() state = jobInfo['job_state'] if (state==False) or (state!='Q'): break count = count + 1 if count < initWaitCount: time.sleep(0.5) else: time.sleep(10) if self.__cfg['hod'].has_key('job-feasibility-attr') and \ self.__cfg['hod']['job-feasibility-attr']: (status, msg) = self.__isJobFeasible() if status == "Never": self.__log.critical(TORQUE_USER_LIMITS_EXCEEDED_MSG + msg + \ "This cluster cannot be allocated now.") return -1 elif status == False: if userLimitsFirstFlag: self.__log.critical(TORQUE_USER_LIMITS_EXCEEDED_MSG + msg + \ "This cluster allocation will succeed only after other " + \ "clusters are deallocated.") userLimitsFirstFlag = False if state and state != 'C': status = True return status def __isJobFeasible(self): return self.__nodePool.isJobFeasible() def __get_ringmaster_client(self): ringmasterXRS = None ringList = self.__svcrgyClient.getServiceInfo( self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(), 'ringmaster', 'hod') if ringList and len(ringList): if isinstance(ringList, list): ringmasterXRS = ringList[0]['xrs'] else: count = 0 waitTime = self.__cfg['hod']['allocate-wait-time'] while count < waitTime: if hodInterrupt.isSet(): raise HodInterruptException() ringList = self.__svcrgyClient.getServiceInfo( self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(), 'ringmaster', 'hod') if ringList and len(ringList): if isinstance(ringList, list): ringmasterXRS = ringList[0]['xrs'] if ringmasterXRS is not None: break else: time.sleep(1) count = count + 1 # check to see if the job exited by any chance in that time: if (count % 10 == 0): if not self.__check_job_status(): break return ringmasterXRS def __init_hadoop_service(self, serviceName, xmlrpcClient): status = True serviceAddress = None serviceInfo = None for i in range(0, 250): try: if hodInterrupt.isSet(): raise HodInterruptException() serviceAddress = xmlrpcClient.getServiceAddr(serviceName) if serviceAddress: if serviceAddress == 'not found': time.sleep(.5) # check to see if the job exited by any chance in that time: if (i % 10 == 0): if not self.__check_job_status(): break else: serviceInfo = xmlrpcClient.getURLs(serviceName) break except HodInterruptException,h : raise h except: self.__log.critical("'%s': ringmaster xmlrpc error." % serviceName) self.__log.debug(get_exception_string()) status = False break if serviceAddress == 'not found' or not serviceAddress: self.__log.critical("Failed to retrieve '%s' service address." % serviceName) status = False elif serviceAddress.startswith("Error: "): errs = serviceAddress[len("Error: "):] self.__log.critical("Cluster could not be allocated because of the following errors.\n%s" % \ errs) status = False else: try: self.__svcrgyClient.registerService(self.__cfg['hodring']['userid'], self.jobId, self.__hostname, serviceName, 'grid', serviceInfo) except HodInterruptException, h: raise h except: self.__log.critical("'%s': registry xmlrpc error." % serviceName) self.__log.debug(get_exception_string()) status = False return status, serviceAddress, serviceInfo def __collect_jobtracker_ui(self, dir): link = self.mapredInfo + "/jobtracker.jsp" parser = miniHTMLParser() parser.setBaseUrl(self.mapredInfo) node_cache = {} self.__log.debug("collect_jobtracker_ui seeded with " + link) def alarm_handler(number, stack): raise AlarmException("timeout") signal.signal(signal.SIGALRM, alarm_handler) input = None while link: self.__log.debug("link: %s" % link) # taskstats.jsp,taskdetails.jsp not included since too many to collect if re.search( "jobfailures\.jsp|jobtracker\.jsp|jobdetails\.jsp|jobtasks\.jsp", link): for i in range(1,5): if hodInterrupt.isSet(): raise HodInterruptException() try: input = urllib.urlopen(link) break except:
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -