LOGIN   :::   RECOVER PASS   :::   GET ACCOUNT    
Browse
  • Projects
  • Code (CVS)
  • Forums
  • News
  • Articles
  • Polls
  •  
    OpenCores
  • FAQ
  • CVS HowTo
  • Mission
  • Media
  • Tools
  • Advertise
  • Mirrors
  • Logos
  • Contact us
  • Job Opportunity
  •  
    Tools
  • Search
      
  • Download Cores (CVSGet)
  •  
    More
  • Wishbone
  • Perlilog
  • EDA tools
  • OpenTech CD
  •  
    Navigation: All forums > Cvs-checkins > Message List > Message Post

    Message

    Reply | Reply all
    Date Prev | Date Next | Thread Prev | Thread Next Date Index | Thread Index

    From: cvs at opencores.org<cvs@o...>
    Date: Tue Jul 22 14:14:55 CEST 2008
    Subject: [cvs-checkins] MODIFIED: jop ...
    Top
    Date: 00/08/07 22:14:14

    Added: jop/xilinx/ml401a/vlab vlab.py
    Log:
    Initial Revision


    Revision Changes Path
    1.1 jop/xilinx/ml401a/vlab/vlab.py

    http://www.opencores.org/cvsweb.shtml/jop/xilinx/ml401a/vlab/vlab.py?rev=1.1&content-type=text/x-cvsweb-markup

    Index: vlab.py
    ===================================================================
    #
    # vlab.py
    #
    # Source code for non-generated parts of virtual lab module.
    #
    # Author: Jack Whitham
    # RCS: $Id: vlab.py,v 1.1 2008/07/22 12:14:55 jwhitham Exp $
    #

    import fcntl, os, popen2, time, atexit, signal, zlib
    from vlaberr import *
    from vlabmsg import *

    CVERSION = '0.1'
    SHORT_TIMEOUT = 0.1

    class VlabException(Exception):
    pass

    class VlabConnectionException(VlabException):
    pass

    class VlabErrorException(VlabException):
    def __init__(self, errorcode, reason):
    VlabException.__init__(self, reason)
    self.errorcode = errorcode

    class VlabBitfileException(Exception):
    pass

    [ BINFO_UNUSED , BINFO_INVALID , BINFO_INCOMPLETE , BINFO_READY,
    BINFO_WRONG_FPGA, UINFO_IN_USE, UINFO_OFFLINE,
    UINFO_AVAILABLE, UINFO_UNKNOWN,
    VLAB_NOT_CONNECTED, VLAB_BOARD_SERVER,
    VLAB_RELAY_SERVER ] = range(12)

    class VlabBitInfo:
    def __init__(self):
    self.bid = -1
    self.size_bits = ""
    self.design_name = ""
    self.part_name = ""
    self.syn_date = ""
    self.syn_time = ""
    self.status = BINFO_UNUSED

    def __str__(self):
    if ( self.status == BINFO_UNUSED ):
    return "unused"
    elif ( self.status == BINFO_INVALID ):
    return "invalid"
    elif ( self.status == BINFO_INCOMPLETE ):
    return "incomplete"
    else:
    return "%s (%s %s) for %s" % (self.design_name,
    self.syn_date, self.syn_time, self.part_name)

    class VlabUserInfo:
    def __init__(self):
    self.lock_count = 0
    self.user_name = None
    self.status = UINFO_UNKNOWN

    def __str__(self):
    if ( self.status == UINFO_AVAILABLE ):
    return "available"
    elif ( self.status == UINFO_OFFLINE ):
    return "offline"
    elif ( self.user_name == None ):
    return "?"
    else:
    return self.user_name

    class VlabVersion:
    def __init__(self):
    self.client = CVERSION
    self.relay = None
    self.embedded = None

    def Lowest(self):
    d = [ x for x in [ self.client, self.relay,
    self.embedded ] if x != None ]
    d.sort()
    return d[ 0 ]

    def __str__(self):
    return str(self.Lowest())

    class VlabPlatform: def __init__(self): self.board_info = None self.fpga_count = 0 self.driver_name = None self.fpga_name = None class VlabConn(VlabBase): def __init__(self): self.message_buffer = [] self.send_fd = None self.receive_fd = None self.ssh = None self.debug = False self.timeout = 10.0 atexit.register(self.Disconnect) def Connect(self, relay_server, user_name, ssh_port=None, private_key_file=None): cmd_list = [ 'ssh' ] if ( ssh_port != None ): cmd_list.extend(['-p', '%u' % ssh_port]) if ( private_key_file != None ): cmd_list.extend(['-i', private_key_file]) cmd_list.extend(['-l', user_name ]) cmd_list.append(relay_server) if ( self.debug ): print 'Running command', cmd_list self.Disconnect() self.ssh = popen2.Popen3(cmd_list) self.receive_fd = self.ssh.fromchild self.send_fd = self.ssh.tochild self.__SetBlocking(False) self.SendMessage(MSG_REM, "Client version %s connected." % CVERSION) self.ReceiveMessage() def Disconnect(self): if ( self.receive_fd != None): try: self.receive_fd.close() except: pass if ( self.send_fd != None ): try: self.send_fd.close() except: pass if ( self.ssh != None ): try: os.kill(self.ssh.pid, signal.SIGTERM) time.sleep(0.05) os.kill(self.ssh.pid, signal.SIGTERM) time.sleep(0.05) os.kill(self.ssh.pid, signal.SIGKILL) except: pass self.ssh.wait() self.receive_fd = self.send_fd = self.ssh = None def __SetBlocking(self, blocking): b = os.O_NONBLOCK if ( blocking ): b = 0 fcntl.fcntl(self.receive_fd.fileno(), fcntl.F_SETFL, b) def UartMode(self, blocking): self.__SetBlocking(blocking) out = (self.receive_fd, self.send_fd) self.receive_fd = self.send_fd = None return out def __CheckConn(self): if (( None in (self.receive_fd, self.send_fd) ) or self.receive_fd.closed or self.send_fd.closed or ( self.ssh == None )): raise VlabConnectionException("Not connected.") def SendMessage(self, tag, *parameters): self.__CheckConn() msg = '%s %s\n' % (tag, ' '.join([ str(p) for p in parameters])) if ( self.debug ): print 'Send', msg try: self.send_fd.write(msg) self.send_fd.flush() except IOError: raise VlabConnectionException("Sending message failed.") def ReceiveMessage(self): # Receive everything available now rx = self.__ReceiveMessage() t = self.timeout while (( not rx ) and ( t >= 0.0 )): # Wait for timeout, try again time.sleep(SHORT_TIMEOUT) rx = self.__ReceiveMessage() t -= SHORT_TIMEOUT # Empty buffer if something was received while ( rx ): rx = self.__ReceiveMessage() def __ReceiveMessage(self): self.__CheckConn() try: buffer = self.receive_fd.read(512) except IOError: buffer = '' for ch in buffer: if ( ch in "\r\n" ): self.__ProcessMessage(''.join(self.message_buffer)) self.message_buffer = [] elif ( ch in '\x08\x7f' ): if ( len(self.message_buffer) != 0 ): self.message_buffer.pop() else: if ( len(self.message_buffer) < 1024 ): self.message_buffer.append(ch) return ( len(buffer) != 0 ) def __ProcessMessage(self, msg): if ( self.debug ): print 'Receive', msg fields = msg.split() if ( len(fields) == 0 ): return False command = fields[ 0 ].lower() tbl_entry = MSG_TABLE.get(command, None) if ( tbl_entry == None ): return False get_all_remaining = tbl_entry[ 0 ] if ( get_all_remaining ): # Resplit - put all of final parameter into one string. fields = msg.split(None, len(tbl_entry) - 2) if ( self.debug ): print 'Received fields: ', fields handler_fn = getattr(self, tbl_entry[ 1 ], None) assert ( handler_fn != None ) params = [] for (value, conv_fn) in zip(fields[ 1: ], tbl_entry[ 2: ]): params.append(conv_fn(value)) handler_fn(*params) return True class Version: pass class Vlab(VlabConn): def __init__(self): VlabConn.__init__(self) self.result = ERR_NONE self.version = VlabVersion() self.platform = VlabPlatform() self.userinfo_table = dict() self.bitinfo_table = dict() self.stage = VLAB_NOT_CONNECTED def SendCommand(self, tag, *parameters): self.SendMessage(tag, *parameters) self.result = ERR_NORESP self.ReceiveMessage() if ( self.result != ERR_NONE ): raise VlabErrorException(self.result, self.FindError(self.result)) def FindError(self, name): return ERR_TABLE.get(name, ERR_TABLE[ ERR_UNKNOWN ]) def UseUart(self, uart_number, blocking): self.SendCommand(MSG_USEUART, uart_number) return self.UartMode(blocking) def SetUart(self, uart_number, baud): self.SendCommand(MSG_SETUART, uart_number, baud) def SendBitfile(self, bits, compress_level=9): # Begin by compressing the bit file # (You might be sending the file over a slow network connection) bits = zlib.compress(bits, compress_level) size_bits = len(bits) * 8 self.load_bid = None self.load_ok = None # See ClLoadReady self.SendCommand(MSG_LOADBITS, size_bits) if ( self.load_bid == None ): raise VlabErrorException(self.result, "Bitfile id (BID) was not received.") bid_copy = self.load_bid try: self.send_fd.write(bits) self.send_fd.flush() except IOError: raise VlabConnectionException("Sending bitstream data failed.") # Null comment - should get us a Loaded acknowledgement self.SendCommand(MSG_REM) # What was the result? See ClLoaded if ( self.load_ok == None ): raise VlabConnectionException("Load completion " "message did not arrive.") elif ( bid_copy != self.load_bid ): raise VlabErrorException(ERR_UNKNOWN, "Bitfile id (BID) has changed!") elif ( not self.load_ok ): raise VlabBitfileException("Bitfile did not validate.") return bid_copy def ProgramFPGA(self, fpga_num, bid): # Schedule programming self.load_bid = bid self.SendCommand(MSG_PROGRAM, fpga_num, bid) # Wait for a response - this might take longer than # self.timeout because (1) the FPGA might take ages to # program, and (2) there might be a whole lot of other # things in the queue self.result = ERR_NORESP while ( self.result == ERR_NORESP ): self.SendMessage(MSG_REM) self.ReceiveMessage() if ( self.result != ERR_NONE ): raise VlabErrorException(self.result, self.FindError(self.result)) def GetBitInfo(self): self.bitinfo_table = dict() self.SendCommand(MSG_SHOWBITS) out = [] i = 0 while ( self.bitinfo_table.has_key(i) ): out.append(self.bitinfo_table[ i ]) i += 1 return out def GetUserInfo(self, board_name): if ( self.stage != VLAB_RELAY_SERVER ): raise VlabException("This command only works while you " "are connected to the relay server (but not a board).") self.userinfo_table = dict() self.SendCommand(MSG_SHOWBITS, board_name) def ClUsingUart(self): self.result = ERR_NONE def ClOk(self, explanation): self.result = ERR_NONE def ClEndList(self): self.result = ERR_NONE def ClError(self, shortname, explanation): self.result = shortname def ClWall(self, english): print 'Wall received:', english def ClGetEmbeddedVersion(self, version, english): print 'Board version:', version, english self.version.embedded = version def ClGetRelayVersion(self, version, english): print 'Relay server version:', version, english self.version.relay = version def ClReceiveBoardInfo(self, english): print 'Board:', english self.platform.board_info = english def ClReceiveFPGAInfo(self, fpga_count, driver_name, fpga_name): self.platform.fpga_count = fpga_count self.platform.driver_name = driver_name self.platform.fpga_name = fpga_name def ClLoadReady(self, bid, num_bits, english): self.result = ERR_NONE self.load_bid = bid def ClLoaded(self, bid, ok): self.result = ERR_NONE self.load_bid = bid self.load_ok = ok def ClReceiveBitInfo(self, index, bid, size_bits, design_name, part_name, syn_date, syn_time, english): try: binfo = self.bitinfo_table[ index ] except KeyError: binfo = self.bitinfo_table[ index ] = VlabBitInfo() binfo.bid = bid binfo.size_bits = size_bits binfo.design_name = design_name binfo.part_name = part_name binfo.syn_date = syn_date binfo.syn_time = syn_time if ( binfo.bid <= 0 ): binfo.status = BINFO_UNUSED elif ( binfo.size_bits <= 0 ): if ( binfo.syn_time == 'invalid' ): binfo.status = BINFO_INVALID elif ( binfo.syn_time == 'incomplete' ): binfo.status = BINFO_INCOMPLETE elif ( binfo.syn_time == 'wrong' ): binfo.status = BINFO_WRONG_FPGA else: binfo.status = BINFO_INVALID else: binfo.status = BINFO_READY def Connect(self, relay_server, board_name=None, ssh_port=None, user_name=None, private_key_file=None): VlabConn.Connect(self, relay_server=relay_server, ssh_port=ssh_port, user_name=user_name, private_key_file=private_key_file) self.stage = VLAB_RELAY_SERVER if ( board_name != None ): self.ConnectBoard(board_name) def Disconnect(self): VlabConn.Disconnect(self) self.stage = VLAB_NOT_CONNECTED def ConnectBoard(self, board_name): if ( self.stage != VLAB_RELAY_SERVER ): raise VlabException("Connect() to a relay server first.") retry = True while ( retry and self.stage == VLAB_RELAY_SERVER ): retry = False try: self.SendCommand(MSG_CONNECT, board_name) self.stage = VLAB_BOARD_SERVER # finished except VlabErrorException, e: if ( e.errorcode == ERR_UNAVAILABLE ): # try again - there may be more than one instance # of this board, and another might be available. # we will see ERR_BUSY when we run out of boards. retry = True else: raise e def ClProgramOk(self, bid): # Check bid - very important to do this because # other ProgramOk messages might get through if ( bid == self.load_bid ): self.result = ERR_NONE def ClProgramFailed(self, bid, shortname): # Check bed - see ClProgramOk if ( bid == self.load_bid ): self.result = shortname def ClUserInfo(self, index, valid, user_name, lock_count): try: uinfo = self.userinfo_table[ index ] except KeyError: uinfo = self.userinfo_table[ index ] = VlabUserInfo() uinfo.lock_count = lock_count uinfo.user_name = None if ( valid ): uinfo.user_name = user_name uinfo.status = UINFO_IN_USE elif ( user_name == "offline" ): uinfo.status = UINFO_OFFLINE else: uinfo.status = UINFO_AVAILABLE

     
    Copyright (c) 1999 OPENCORES.ORG. All rights reserved.