Logo Search packages:      
Sourcecode: ipython version File versions  Download package

cocoa_frontend.py

# encoding: utf-8
# -*- test-case-name: IPython.frontend.cocoa.tests.test_cocoa_frontend -*-

"""PyObjC classes to provide a Cocoa frontend to the 
IPython.kernel.engineservice.IEngineBase.

To add an IPython interpreter to a cocoa app, instantiate an 
IPythonCocoaController in a XIB and connect its textView outlet to an
NSTextView instance in your UI. That's it.

Author: Barry Wark
"""

__docformat__ = "restructuredtext en"

#-----------------------------------------------------------------------------
#       Copyright (C) 2008  The IPython Development Team
#
#  Distributed under the terms of the BSD License.  The full license is in
#  the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------

#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------

import sys
import objc
from IPython.external import guid

from Foundation import NSObject, NSMutableArray, NSMutableDictionary,\
                        NSLog, NSNotificationCenter, NSMakeRange,\
                        NSLocalizedString, NSIntersectionRange,\
                        NSString, NSAutoreleasePool
                        
from AppKit import NSApplicationWillTerminateNotification, NSBeep,\
                    NSTextView, NSRulerView, NSVerticalRuler

from pprint import saferepr

import IPython
from IPython.kernel.engineservice import ThreadedEngineService
from IPython.frontend.asyncfrontendbase import AsyncFrontEndBase

from twisted.internet.threads import blockingCallFromThread
from twisted.python.failure import Failure

#-----------------------------------------------------------------------------
# Classes to implement the Cocoa frontend
#-----------------------------------------------------------------------------

# TODO: 
#   1. use MultiEngineClient and out-of-process engine rather than 
#       ThreadedEngineService?
#   2. integrate Xgrid launching of engines
        
00057 class AutoreleasePoolWrappedThreadedEngineService(ThreadedEngineService):
    """Wrap all blocks in an NSAutoreleasePool"""
    
00060     def wrapped_execute(self, msg, lines):
        """wrapped_execute"""
        try:
            p = NSAutoreleasePool.alloc().init()
            result = super(AutoreleasePoolWrappedThreadedEngineService,
                            self).wrapped_execute(msg, lines)
        finally:
            p.drain()
        
        return result
    


00073 class Cell(NSObject):
    """
    Representation of the prompts, input and output of a cell in the
    frontend
    """
    
    blockNumber = objc.ivar().unsigned_long()
    blockID = objc.ivar()
    inputBlock = objc.ivar()
    output = objc.ivar()
     

   
00086 class CellBlock(object):
    """
    Storage for information about text ranges relating to a single cell
    """

        
    def __init__(self, inputPromptRange, inputRange=None, outputPromptRange=None,
                outputRange=None):
        super(CellBlock, self).__init__()
        self.inputPromptRange = inputPromptRange
        self.inputRange = inputRange
        self.outputPromptRange = outputPromptRange
        self.outputRange = outputRange
    
00100     def update_ranges_for_insertion(self, text, textRange):
        """Update ranges for text insertion at textRange"""
        
        for r in [self.inputPromptRange,self.inputRange,
                    self.outputPromptRange, self.outputRange]:
            if(r == None):
                continue
            intersection = NSIntersectionRange(r,textRange)
            if(intersection.length == 0): #ranges don't intersect
                if r.location >= textRange.location:
                    r.location += len(text)
            else: #ranges intersect
                if(r.location > textRange.location):
                    offset = len(text) - intersection.length
                    r.length -= offset
                    r.location += offset
                elif(r.location == textRange.location):
                    r.length += len(text) - intersection.length
                else:
                    r.length -= intersection.length
    
    
00122     def update_ranges_for_deletion(self, textRange):
        """Update ranges for text deletion at textRange"""
        
        for r in [self.inputPromptRange,self.inputRange,
                    self.outputPromptRange, self.outputRange]:
            if(r==None):
                continue
            intersection = NSIntersectionRange(r, textRange)
            if(intersection.length == 0): #ranges don't intersect
                if r.location >= textRange.location:
                    r.location -= textRange.length
            else: #ranges intersect
                if(r.location > textRange.location):
                    offset = intersection.length
                    r.length -= offset
                    r.location += offset
                elif(r.location == textRange.location):
                    r.length += intersection.length
                else:
                    r.length -= intersection.length
    
    def __repr__(self):
        return 'CellBlock('+ str((self.inputPromptRange,
                                self.inputRange,
                                self.outputPromptRange,
                                self.outputRange)) + ')'
    


        
class IPythonCocoaController(NSObject, AsyncFrontEndBase):
    userNS = objc.ivar() #mirror of engine.user_ns (key=>str(value))
    waitingForEngine = objc.ivar().bool()
    textView = objc.IBOutlet()
    
    def init(self):
        self = super(IPythonCocoaController, self).init()
        AsyncFrontEndBase.__init__(self,
                    engine=AutoreleasePoolWrappedThreadedEngineService())
        if(self != None):
            self._common_init()
        
        return self
    
    def _common_init(self):
        """_common_init"""
        
        self.userNS = NSMutableDictionary.dictionary()
        self.waitingForEngine = False
        
        self.lines = {}
        self.tabSpaces = 4
        self.tabUsesSpaces = True
        self.currentBlockID = self.next_block_ID()
        self.blockRanges = {} # blockID=>CellBlock
    
    
    def awakeFromNib(self):
        """awakeFromNib"""
        
        self._common_init()
        
        # Start the IPython engine
        self.engine.startService()
        NSLog('IPython engine started')
        
        # Register for app termination
        nc = NSNotificationCenter.defaultCenter()
        nc.addObserver_selector_name_object_(
                                    self,
                                    'appWillTerminate:',
                                    NSApplicationWillTerminateNotification,
                                    None)
        
        self.textView.setDelegate_(self)
        self.textView.enclosingScrollView().setHasVerticalRuler_(True)
        r = NSRulerView.alloc().initWithScrollView_orientation_(
                                        self.textView.enclosingScrollView(),
                                        NSVerticalRuler)
        self.verticalRulerView = r
        self.verticalRulerView.setClientView_(self.textView)
        self._start_cli_banner()
        self.start_new_block()
    
    
    def appWillTerminate_(self, notification):
        """appWillTerminate"""
        
        self.engine.stopService()
    
    
    def complete(self, token):
        """Complete token in engine's user_ns
        
        Parameters
        ----------
        token : string
        
        Result
        ------
        Deferred result of 
        IPython.kernel.engineservice.IEngineBase.complete
        """
        
        return self.engine.complete(token)
    
    
    def execute(self, block, blockID=None):
        self.waitingForEngine = True
        self.willChangeValueForKey_('commandHistory')
        d = super(IPythonCocoaController, self).execute(block,
                                                        blockID)
        d.addBoth(self._engine_done)
        d.addCallback(self._update_user_ns)
        
        return d
    
    
    def push_(self, namespace):
        """Push dictionary of key=>values to python namespace"""
        
        self.waitingForEngine = True
        self.willChangeValueForKey_('commandHistory')
        d = self.engine.push(namespace)
        d.addBoth(self._engine_done)
        d.addCallback(self._update_user_ns)
    
    
    def pull_(self, keys):
        """Pull keys from python namespace"""
        
        self.waitingForEngine = True
        result = blockingCallFromThread(self.engine.pull, keys)
        self.waitingForEngine = False
    
    @objc.signature('v@:@I')
    def executeFileAtPath_encoding_(self, path, encoding):
        """Execute file at path in an empty namespace. Update the engine
        user_ns with the resulting locals."""
        
        lines,err = NSString.stringWithContentsOfFile_encoding_error_(
            path,
            encoding,
            None)
        self.engine.execute(lines)
    
    
    def _engine_done(self, x):
        self.waitingForEngine = False
        self.didChangeValueForKey_('commandHistory')
        return x
    
    def _update_user_ns(self, result):
        """Update self.userNS from self.engine's namespace"""
        d = self.engine.keys()
        d.addCallback(self._get_engine_namespace_values_for_keys)
        
        return result
    
    
    def _get_engine_namespace_values_for_keys(self, keys):
        d = self.engine.pull(keys)
        d.addCallback(self._store_engine_namespace_values, keys=keys)
    
    
    def _store_engine_namespace_values(self, values, keys=[]):
        assert(len(values) == len(keys))
        self.willChangeValueForKey_('userNS')
        for (k,v) in zip(keys,values):
            self.userNS[k] = saferepr(v)
        self.didChangeValueForKey_('userNS')
    
    
    def update_cell_prompt(self, result, blockID=None):
        print self.blockRanges
        if(isinstance(result, Failure)):
            prompt = self.input_prompt()
            
        else:
            prompt = self.input_prompt(number=result['number'])
        
        r = self.blockRanges[blockID].inputPromptRange
        self.insert_text(prompt,
                textRange=r,
                scrollToVisible=False
                )
        
        return result
    
    
    def render_result(self, result):
        blockID = result['blockID']
        inputRange = self.blockRanges[blockID].inputRange
        del self.blockRanges[blockID]
        
        #print inputRange,self.current_block_range()
        self.insert_text('\n' +
                self.output_prompt(number=result['number']) +
                result.get('display',{}).get('pprint','') +
                '\n\n',
                textRange=NSMakeRange(inputRange.location+inputRange.length,
                                    0))
        return result
    
        
    def render_error(self, failure):
        print failure
        blockID = failure.blockID
        inputRange = self.blockRanges[blockID].inputRange
        self.insert_text('\n' +
                        self.output_prompt() +
                        '\n' +
                        failure.getErrorMessage() +
                        '\n\n',
                        textRange=NSMakeRange(inputRange.location +
                                                inputRange.length,
                                                0))
        self.start_new_block()
        return failure
    
    
    def _start_cli_banner(self):
        """Print banner"""
        
        banner = """IPython1 %s -- An enhanced Interactive Python.""" % \
                    IPython.__version__
        
        self.insert_text(banner + '\n\n')
    
    
    def start_new_block(self):
        """"""
        
        self.currentBlockID = self.next_block_ID()
        self.blockRanges[self.currentBlockID] = self.new_cell_block()
        self.insert_text(self.input_prompt(), 
            textRange=self.current_block_range().inputPromptRange)
    
    
    
    def next_block_ID(self):
        
        return guid.generate() 
    
    def new_cell_block(self):
        """A new CellBlock at the end of self.textView.textStorage()"""
        
        return CellBlock(NSMakeRange(self.textView.textStorage().length(), 
                                    0), #len(self.input_prompt())),
                        NSMakeRange(self.textView.textStorage().length(),# + len(self.input_prompt()),
                                    0))
    
    
    def current_block_range(self):
        return self.blockRanges.get(self.currentBlockID, 
                        self.new_cell_block())
    
    def current_block(self):
        """The current block's text"""
        
        return self.text_for_range(self.current_block_range().inputRange)
    
    def text_for_range(self, textRange):
        """text_for_range"""
        
        ts = self.textView.textStorage()
        return ts.string().substringWithRange_(textRange)
    
    def current_line(self):
        block = self.text_for_range(self.current_block_range().inputRange)
        block = block.split('\n')
        return block[-1]
    
    
    def insert_text(self, string=None, textRange=None, scrollToVisible=True):
        """Insert text into textView at textRange, updating blockRanges 
        as necessary
        """
        if(textRange == None):
            #range for end of text
            textRange = NSMakeRange(self.textView.textStorage().length(), 0) 
        
        
        self.textView.replaceCharactersInRange_withString_(
            textRange, string)
            
        for r in self.blockRanges.itervalues():
            r.update_ranges_for_insertion(string, textRange)
        
        self.textView.setSelectedRange_(textRange)
        if(scrollToVisible):
            self.textView.scrollRangeToVisible_(textRange)
    
    
    
    def replace_current_block_with_string(self, textView, string):
        textView.replaceCharactersInRange_withString_(
                                    self.current_block_range().inputRange,
                                    string)
        self.current_block_range().inputRange.length = len(string)
        r = NSMakeRange(textView.textStorage().length(), 0)
        textView.scrollRangeToVisible_(r)
        textView.setSelectedRange_(r)
    
    
    def current_indent_string(self):
        """returns string for indent or None if no indent"""
        
        return self._indent_for_block(self.current_block())
    
    
    def _indent_for_block(self, block):
        lines = block.split('\n')
        if(len(lines) > 1):
            currentIndent = len(lines[-1]) - len(lines[-1].lstrip())
            if(currentIndent == 0):
                currentIndent = self.tabSpaces
        
            if(self.tabUsesSpaces):
                result = ' ' * currentIndent
            else:
                result = '\t' * (currentIndent/self.tabSpaces)
        else:
            result = None
        
        return result
    
    
    # NSTextView delegate methods...
    def textView_doCommandBySelector_(self, textView, selector):
        assert(textView == self.textView)
        NSLog("textView_doCommandBySelector_: "+selector)
        
        
        if(selector == 'insertNewline:'):
            indent = self.current_indent_string()
            if(indent):
                line = indent + self.current_line()
            else:
                line = self.current_line()
            
            if(self.is_complete(self.current_block())):
                self.execute(self.current_block(),
                                blockID=self.currentBlockID)
                self.start_new_block()
                
                return True
            
            return False
        
        elif(selector == 'moveUp:'):
            prevBlock = self.get_history_previous(self.current_block())
            if(prevBlock != None):
                self.replace_current_block_with_string(textView, prevBlock)
            else:
                NSBeep()
            return True
        
        elif(selector == 'moveDown:'):
            nextBlock = self.get_history_next()
            if(nextBlock != None):
                self.replace_current_block_with_string(textView, nextBlock)
            else:
                NSBeep()
            return True
        
        elif(selector == 'moveToBeginningOfParagraph:'):
            textView.setSelectedRange_(NSMakeRange(
                            self.current_block_range().inputRange.location, 
                            0))
            return True
        elif(selector == 'moveToEndOfParagraph:'):
            textView.setSelectedRange_(NSMakeRange(
                            self.current_block_range().inputRange.location + \
                            self.current_block_range().inputRange.length, 0))
            return True
        elif(selector == 'deleteToEndOfParagraph:'):
            if(textView.selectedRange().location <= \
                self.current_block_range().location):
                raise NotImplemented()
            
            return False # don't actually handle the delete
        
        elif(selector == 'insertTab:'):
            if(len(self.current_line().strip()) == 0): #only white space
                return False
            else:
                self.textView.complete_(self)
                return True
        
        elif(selector == 'deleteBackward:'):
            #if we're at the beginning of the current block, ignore
            if(textView.selectedRange().location == \
                self.current_block_range().inputRange.location):
                return True
            else:
                for r in self.blockRanges.itervalues():
                    deleteRange = textView.selectedRange
                    if(deleteRange.length == 0):
                        deleteRange.location -= 1
                        deleteRange.length = 1
                    r.update_ranges_for_deletion(deleteRange)
                return False
        return False
    
    
    def textView_shouldChangeTextInRanges_replacementStrings_(self, 
        textView, ranges, replacementStrings):
        """
        Delegate method for NSTextView.
        
        Refuse change text in ranges not at end, but make those changes at 
        end.
        """
        
        assert(len(ranges) == len(replacementStrings))
        allow = True
        for r,s in zip(ranges, replacementStrings):
            r = r.rangeValue()
            if(textView.textStorage().length() > 0 and
                r.location < self.current_block_range().inputRange.location):
                self.insert_text(s)
                allow = False
        
        return allow
    
    def textView_completions_forPartialWordRange_indexOfSelectedItem_(self, 
        textView, words, charRange, index):
        try:
            ts = textView.textStorage()
            token = ts.string().substringWithRange_(charRange)
            completions = blockingCallFromThread(self.complete, token)
        except:
            completions = objc.nil
            NSBeep()
        
        return (completions,0)
    


Generated by  Doxygen 1.6.0   Back to index