• Home
  • Module code
  • wte
  • wte.views
  • wte.views.part

Source code for wte.views.part

# -*- coding: utf-8 -*-
"""
#####################################
:mod:`wte.views.part` -- Part Backend
#####################################

The :mod:`~wte.views.part` module provides the functionality for
creating, viewing, editing, and deleting :class:`~wte.models.Part`.

Routes are defined in :func:`~wte.views.part.init`.

.. moduleauthor:: Mark Hall <mark.hall@work.room3b.eu>
"""
from nine import (IS_PYTHON2, str, native_str, nimport)  # Python 2.7 compatibility

import formencode
import math
import json
import re
import transaction

from pyramid.httpexceptions import (HTTPSeeOther, HTTPNotFound, HTTPForbidden)
from pyramid.response import Response
from pyramid.renderers import render_to_response
from pyramid.view import view_config
from pywebtools.formencode import State, CSRFSchema
from pywebtools.pyramid.auth.decorators import unauthorised_redirect, require_logged_in
from pywebtools.pyramid.auth.views import current_user
from pywebtools.pyramid.decorators import require_method
from pywebtools.sqlalchemy import DBSession
from pkg_resources import resource_string
from sqlalchemy import and_, distinct
from zipfile import ZipFile, ZIP_DEFLATED, ZIP_STORED, BadZipfile

from wte.models import (Part, UserPartRole, Asset, UserPartProgress, User,
                        Quiz, QuizAnswer)
from wte.text_formatter import compile_rst
from wte.util import (ordered_counted_set, send_email, get_config_setting)
from wte.views.quiz import extract_quizzes

BytesIO = nimport('io:BytesIO')


[docs]def init(config): """Adds the part-specific backend routes (route name, URL pattern handler): * ``part.list`` -- ``/parts`` -- :func:`~wte.views.part.list_parts` * ``part.new`` -- ``/parts/new`` -- :func:`~wte.views.part.new` * ``part.search`` -- ``/parts/search`` -- :func:`~wte.views.part.search` * ``part.import`` -- ``/parts/import`` -- :func:`~wte.views.part.import_file` * ``part.view`` -- ``/parts/{pid}`` -- :func:`~wte.views.frontend.view_part` * ``part.edit`` -- ``/parts/{pid}/edit`` -- :func:`~wte.views.part.edit` * ``part.delete`` -- ``/parts/{pid}/delete`` -- :func:`~wte.views.part.delete` * ``part.delete`` -- ``/parts/{pid}/delete`` -- :func:`~wte.views.part.delete` * ``part.preview`` -- ``/parts/{pid}/rst_preview`` -- :func:`~wte.views.part.preview` * ``part.register`` -- ``/parts/{pid}/register`` -- :func:`~wte.views.part.register` * ``part.register.settings`` -- ``/parts/{pid}/register/settings`` -- :func:`~wte.views.part.edit_register_settings` * ``part.deregister`` -- ``/parts/{pid}/deregister`` -- :func:`~wte.views.part.deregister` * ``part.change_status`` -- ``/parts/{pid}/change_status`` -- :func:`~wte.views.part.change_status` * ``part.export`` -- ``/parts/{pid}/export`` -- :func:`~wte.views.part.export` * ``part.download`` -- ``/parts/{pid}/download`` -- :func:`~wte.views.part.download` * ``part.reset-files`` -- ``/parts/{pid}/reset_files`` -- :func:`~wte.views.part.reset_files` * ``part.progress.download`` -- ``/parts/{pid}/progress/download`` -- :func:`~wte.views.part.download_part_progress` * ``part.progress.update`` -- ``/parts/{pid}/progress/update`` -- :func:`~wte.views.part.update_part_progress` """ config.add_route('part.list', '/parts') config.add_route('part.new', '/parts/new/{new_type}') config.add_route('part.import', '/parts/import') config.add_route('part.search', '/parts/search') config.add_route('part.view', '/parts/{pid}') config.add_route('part.edit', '/parts/{pid}/edit') config.add_route('part.delete', '/parts/{pid}/delete') config.add_route('part.preview', '/parts/{pid}/rst_preview') config.add_route('part.register', '/parts/{pid}/register') config.add_route('part.register.settings', '/parts/{pid}/register/settings') config.add_route('part.deregister', '/parts/{pid}/deregister') config.add_route('part.change_status', '/parts/{pid}/change_status') config.add_route('part.export', '/parts/{pid}/export') config.add_route('part.download', '/parts/{pid}/download') config.add_route('part.reset-files', '/parts/{pid}/reset_files') config.add_route('part.progress.download', '/parts/{pid}/progress/download') config.add_route('part.progress.update', '/parts/{pid}/progress/update')
[docs]def get_user_part_progress(dbsession, user, part): """Returns the :class:`~wte.models.UserPartProgress` for the given ``user`` and ``part. If none exists, then a new one is instantiated. If the :class:`~wte.models.UserPartProgress` points to a current page that is different to ``page``, then the :class:`~wte.models.UserPartProgress` is updated. :param user: The user to get the progress for :type user: :class:`~wte.models.User` :param part: The part to get the progress for :type part: :class:`~wte.models.Part` :return: The :class:`~wte.models.UserPartProgress` :rtype: :class:`~wte.models.UserPartProgress` """ if part.type == 'part': progress = dbsession.query(UserPartProgress).\ filter(and_(UserPartProgress.user_id == user.id, UserPartProgress.part_id == part.id)).first() if not progress: progress = UserPartProgress(user_id=user.id, part_id=part.id) elif part.type == 'page': progress = dbsession.query(UserPartProgress).\ filter(and_(UserPartProgress.user_id == user.id, UserPartProgress.part_id == part.parent.id)).first() if not progress: progress = UserPartProgress(user_id=user.id, part_id=part.parent.id, current_id=part.id) else: progress = None if progress: with transaction.manager: dbsession.add(progress) dbsession.add(part) if progress.visited is None: progress.visited = {} if part.type == 'page': if str(part.id) not in progress.visited: progress.visited[str(part.id)] = {'duration': 0} templates = part.parent.templates progress.current_id = part.id else: templates = part.templates for template in templates: found = False for user_file in progress.files: if user_file.filename == template.filename and user_file.mimetype == template.mimetype: found = True break if not found: user_file = Asset(filename=template.filename, mimetype=template.mimetype, order=template.order, data=template.data, type='file', etag=template.etag) dbsession.add(user_file) progress.files.append(user_file) for template in templates: for user_file in progress.files: if user_file.filename == template.filename and user_file.mimetype == template.mimetype: user_file.order = template.order break dbsession.add(part) dbsession.add(progress) dbsession.add(user) return progress
@view_config(route_name='part.list', renderer='wte:templates/part/list.kajiki') @current_user()
[docs]def list_parts(request): """Handles the ``/parts`` URL, displaying modules. If no parameters are specified, lists all available modules. If a "user_id" parameter is specified, lists all modules for that user. """ dbsession = DBSession() if 'user_id' in request.params: user = dbsession.query(User).filter(User.id == request.params['user_id']).first() if user and (user.id == request.current_user.id or request.current_user.has_permission('admin.users.view')): if user.id == request.current_user.id: title = 'My Modules' missing = 'You have not registered for any modules.' else: title = '%s\'s Modules' % user.display_name missing = '%s has not registered for any modules.' % user.display_name parts = dbsession.query(Part).join(UserPartRole) # Status filter available_status = [s[0] for s in dbsession.query(distinct(Part.status)).join(UserPartRole). filter(UserPartRole.user_id == request.params['user_id']).all()] if 'status' in request.params: status = request.params.getall('status') else: if dbsession.query(Part).\ join(UserPartRole).filter(and_(Part.type == 'module', Part.status == 'archived', UserPartRole.user_id == request.params['user_id'])).\ count() <= 10: status = available_status else: status = ['available', 'unavailable'] parts = parts.filter(and_(Part.type == 'module', Part.status.in_(status), UserPartRole.user_id == request.params['user_id'])).order_by(Part.title) crumbs = [{'title': 'My Modules', 'url': request.route_url('part.list', _query={'user_id': request.params['user_id']}), 'current': True}] help_path = ['user', 'learner', 'my_modules.html'] else: raise HTTPNotFound() else: title = 'Available Modules' parts = dbsession.query(Part).filter(and_(Part.type == 'module', Part.status == 'available')).order_by(Part.title).all() missing = 'There are currently no modules available.' crumbs = [{'title': 'Modules', 'url': request.route_url('part.list'), 'current': True}] status = [] available_status = [] help_path = ['user', 'learner', 'modules.html'] return {'parts': parts, 'title': title, 'missing': missing, 'crumbs': crumbs, 'available_status': available_status, 'selected_status': status, 'help': help_path}
@view_config(route_name='part.view') @current_user() @require_logged_in()
[docs]def view_part(request): """Handles the ``parts/{pid}`` URL, displaying the :class:`~wte.models.Part`. Requires that the user has "view" rights on the :class:`~wte.models.Part`. """ dbsession = DBSession() part = dbsession.query(Part).filter(Part.id == request.matchdict['pid']).first() if part: if part.allow('view', request.current_user): progress = get_user_part_progress(dbsession, request.current_user, part) crumbs = create_part_crumbs(request, part, None) quizzes = [] stats = {} if part.type == 'page': template_path = 'wte:templates/part/view/%s.kajiki' % part.parent.display_mode if part.parent.display_mode == 'three_pane_html': help_path = ['user', 'learner', 'html_editor_part.html'] else: help_path = ['user', 'learner', 'text_part.html'] else: template_path = 'wte:templates/part/view/%s.kajiki' % part.type help_path = ['user', 'learner', 'module.html'] # Quiz Summary Generation - @Todo Should be refactored. Perhaps into the quiz view? quiz_ids = {} for quiz in dbsession.query(Quiz).join(Part).\ filter(Quiz.part_id.in_([c.id for c in part.children])).order_by(Part.order): quiz_ids[quiz.id] = len(quizzes) quizzes.append({'id': quiz.id, 'title': quiz.title, 'questions': json.loads(quiz.questions)}) if part.has_role('student', request.current_user): if quiz_ids: for answer in dbsession.query(QuizAnswer).\ filter(and_(QuizAnswer.quiz_id.in_(list(quiz_ids.keys())), QuizAnswer.user_id == request.current_user.id)): quizzes[quiz_ids[answer.quiz_id]]['answered'] = True for question in quizzes[quiz_ids[answer.quiz_id]]['questions']: if question['name'] == answer.question: question['attempts'] = answer.attempts if answer.initial_correct: question['correct'] = True question['answer'] = json.loads(answer.initial_answer) elif answer.final_correct: question['correct'] = True question['answer'] = json.loads(answer.final_answer) else: question['correct'] = False if answer.final_answer: question['answer'] = json.loads(answer.final_answer) quizzes = [quiz for quiz in quizzes if 'answered' in quiz] else: student_count = dbsession.query(UserPartRole).filter(and_(UserPartRole.part_id == part.parent_id, UserPartRole.role == 'student')).count() for quiz in quizzes: for question in quiz['questions']: question['correct'] = {'initial': 0, 'subsequent': 0, 'incorrect': 0, 'total': student_count} if quiz_ids: for answer in dbsession.query(QuizAnswer).\ filter(QuizAnswer.quiz_id.in_(list(quiz_ids.keys()))): for question in quizzes[quiz_ids[answer.quiz_id]]['questions']: if question['name'] == answer.question: if answer.initial_correct: question['correct']['initial'] = question['correct']['initial'] + 1 elif answer.final_correct: question['correct']['subsequent'] = question['correct']['subsequent'] + 1 else: question['correct']['incorrect'] = question['correct']['incorrect'] + 1 # End Quiz Summary Generation # Stats Generation if part.has_role('student', request.current_user): if progress: stats['visited'] = len(progress.visited) stats['time'] = sum([page['duration'] for page in progress.visited.values()])\ if stats['visited'] > 0 else 0 else: stats['visited'] = 0 stats['time'] = 0 else: progresses = dbsession.query(UserPartProgress).join(UserPartProgress.user, User.roles).\ filter(and_(UserPartProgress.part_id == part.id, UserPartRole.part_id == (part.parent_id if part.type == 'part' else part.id), UserPartRole.role == 'student')) total_students = dbsession.query(UserPartRole).\ filter(and_(UserPartRole.part_id == (part.parent_id if part.type == 'part' else part.id), UserPartRole.role == 'student')).count() if total_students > 0: stats['students'] = {'total': total_students, 'inprogress': 0, 'completed': 0} time_spent = [] for progress in progresses: if progress.visited: if len(progress.visited) == len([c for c in part.children if c.status == 'available']): stats['students']['completed'] = stats['students']['completed'] + 1 elif len(progress.visited) > 0: stats['students']['inprogress'] = stats['students']['inprogress'] + 1 if len(progress.visited) > 0: time_spent.append(sum([p['duration'] for p in progress.visited.values()])) if time_spent: time_spent.sort() first = math.floor(len(time_spent) * 0.25) median = math.floor(len(time_spent) * 0.5) third = math.floor(len(time_spent) * 0.75) stats['time'] = {25: time_spent[first], 50: time_spent[median], 75: time_spent[third]} # End Stats Generation labels = [child.label.title() if child.label else child.type.title() for child in part.children] return render_to_response(template_path, {'part': part, 'crumbs': crumbs, 'progress': progress, 'include_footer': part.type != 'page', 'labels': ordered_counted_set(labels), 'quizzes': quizzes, 'stats': stats, 'help': help_path}, request=request) else: if part.type == 'module': unauthorised_redirect(request) else: raise HTTPSeeOther(request.route_url('part.view', pid=part.parent_id)) else: raise HTTPNotFound()
[docs]class NewPartSchema(CSRFSchema): """The :class:`~wte.views.backend.NewPartSchema` handles the validation of a new :class:`~wte.models.Part`. """ title = formencode.validators.UnicodeString(not_empty=True) """The part's title""" parent_id = formencode.validators.Int(if_missing=None) """The parent :class:`~wte.models.Part`""" order = formencode.validators.Int(if_missing=None) """The optional order index to create the new :class:`~wte.models.Part` at""" status = formencode.All(formencode.validators.UnicodeString(if_empty='available', if_missing='available'), formencode.validators.OneOf(['unavailable', 'available'])) """The part's status""" display_mode = formencode.All(formencode.validators.UnicodeString(if_empty=None, if_missing=None), formencode.validators.OneOf([None, 'three_pane_html', 'text_only'])) """The part's display mode""" label = formencode.validators.UnicodeString(if_empty=None, if_missing=None) """The part's label"""
[docs]def create_part_crumbs(request, part, current=None): """Creates the list of breadcrumbs for a given ``part``. If the ``current`` is a ``list``, then all will be added to the end of the crumbs. :param part: The part to create the breadcrumbs to :type part: :class:`~wte.models.Part` :param current: The final, current breadcrumb :type current: ``dict`` or ``list`` :return: A list of `dict` for use in the breadcrumbs :rtype: ``list`` """ crumbs = [] recurse_part = part while recurse_part: crumbs.append({'title': recurse_part.title, 'url': request.route_url('part.view', pid=recurse_part.id)}) recurse_part = recurse_part.parent if part: if request.current_user and request.current_user.logged_in: crumbs.append({'title': 'My Modules', 'url': request.route_url('part.list', _query={'user_id': request.current_user.id})}) else: crumbs.append({'title': 'Modules', 'url': request.route_url('part.list')}) crumbs.reverse() if current: if isinstance(current, list): crumbs.extend(current) else: crumbs.append(current) crumbs[-1]['current'] = True return crumbs
@view_config(route_name='part.new', renderer='wte:templates/part/new.kajiki') @current_user() @require_logged_in()
[docs]def new(request): """Handles the ``/parts/new`` URL, providing the UI and backend for creating a new :class:`~wte.models.Part`. The required permissions depend on the type of :class:`~wte.models.Part` to create: * `module` -- User permission "modules.create" * `tutorial` -- "edit" permission on the parent :class:`~wte.models.Part` * `page` -- "edit" permission on the parent :class:`~wte.models.Part` * `exercise` -- "edit" permission on the parent :class:`~wte.models.Part` * `task` -- "edit" permission on the parent :class:`~wte.models.Part` """ dbsession = DBSession() parent = dbsession.query(Part).\ filter(Part.id == request.params['parent_id']).first()\ if 'parent_id' in request.params else None if parent and not parent.allow('edit', request.current_user): unauthorised_redirect(request) if request.matchdict['new_type'] == 'module': if not request.current_user.has_permission('modules.create'): raise unauthorised_redirect(request) elif parent: request.session.flash('You cannot create a new module here', queue='error') raise HTTPSeeOther(request.route_url('part.view', pid=parent.id)) elif request.matchdict['new_type'] == 'part': if not parent: request.session.flash('You cannot create a new part without a parent', queue='error') raise HTTPSeeOther(request.route_url('part.view', pid=parent.id)) elif parent.type != 'module': request.session.flash('You can only add parts to a module', queue='error') raise HTTPSeeOther(request.route_url('part.view', pid=parent.id)) elif request.matchdict['new_type'] == 'page': if not parent: request.session.flash('You cannot create a new page without a parent', queue='error') raise HTTPSeeOther(request.route_url('part.view', pid=parent.id)) elif parent.type != 'part': request.session.flash('You can only add pages to a part', queue='error') raise HTTPSeeOther(request.route_url('part.view', pid=parent.id)) else: request.session.flash('You cannot create a new part of that type', queue='error') raise HTTPSeeOther(request.route_url('part.list')) crumbs = create_part_crumbs(request, parent, {'title': 'Add %s' % (request.matchdict['new_type'].title()), 'url': request.current_route_url()}) help_path = ['user', 'teacher', request.matchdict['new_type'], 'new.html'] if request.method == 'POST': try: params = NewPartSchema().to_python(request.params, State(request=request)) dbsession = DBSession() with transaction.manager: if params['order'] is not None: max_order = params['order'] if parent: dbsession.add(parent) for child in parent.children: if child.order >= max_order: dbsession.add(child) child.order = child.order + 1 else: if parent: dbsession.add(parent) max_order = [p.order + 1 for p in parent.children] else: max_order = [] max_order.append(0) max_order = max(max_order) new_part = Part(title=params['title'], status=params['status'], type=request.matchdict['new_type'], display_mode=params['display_mode'], label=params['label'], parent=parent, order=max_order, content='') if request.matchdict['new_type'] == 'module': new_part.users.append(UserPartRole(user=request.current_user, role='owner')) dbsession.add(new_part) dbsession.add(new_part) raise HTTPSeeOther(request.route_url('part.edit', pid=new_part.id)) except formencode.Invalid as e: return {'errors': e.error_dict, 'values': request.params, 'crumbs': crumbs, 'help': help_path} return {'crumbs': crumbs, 'help': help_path}
[docs]class EditPartSchema(CSRFSchema): """The :class:`~wte.views.part.EditPartSchema` handles the validation for editing :class:`~wte.models.Part`. """ title = formencode.validators.UnicodeString(not_empty=True) """The part's title""" status = formencode.All(formencode.validators.UnicodeString(if_empty='available', if_missing='available'), formencode.validators.OneOf(['unavailable', 'available', 'archived'])) # TODO: Archived should only work for modules """The part's status""" display_mode = formencode.All(formencode.validators.UnicodeString(if_empty=None, if_missing=None), formencode.validators.OneOf([None, 'three_pane_html', 'text_only'])) """The part's display mode""" label = formencode.validators.UnicodeString(if_empty=None, if_missing=None) """The part's label""" content = formencode.validators.UnicodeString(if_missing='') """The ReST content""" child_part_id = formencode.ForEach(formencode.validators.Int, if_missing=None) """The child :class:`~wte.models.Part` ids for re-ordering""" template_id = formencode.ForEach(formencode.validators.Int, if_missing=None) """The :class:`~wte.models.Template` ids for re-ordering""" email_notify = formencode.validators.StringBool(if_missing=False) """Whether to send out an automatic change notification e-mail""" email_notify_text = formencode.validators.UnicodeString(if_empty='', if_missing='') """The text of the automatic change notification e-mail"""
@view_config(route_name='part.edit', renderer='wte:templates/part/edit.kajiki') @current_user() @require_logged_in()
[docs]def edit(request): """Handles the ``/parts/{pid}/edit`` URL, providing the UI and backend for editing a :class:`~wte.models.Part`. Requires that the user has "edit" rights on the :class:`~wte.models.Part`. """ dbsession = DBSession() part = dbsession.query(Part).filter(Part.id == request.matchdict['pid']).first() if part: if part.allow('edit', request.current_user): crumbs = create_part_crumbs(request, part, {'title': 'Edit', 'url': request.current_route_url()}) help_path = ['user', 'teacher', part.type, 'edit.html'] if request.method == 'POST': try: params = EditPartSchema().to_python(request.params, State(request=request)) with transaction.manager: dbsession.add(part) part.title = params['title'] part.status = params['status'] part.display_mode = params['display_mode'] part.content = params['content'] try: part.compiled_content = compile_rst(params['content'], request, part=part) extract_quizzes(dbsession, part) except Exception as e: msg = e.message.replace('<string>:', 'Invalid ReST: Line ').replace('(SEVERE/4) ', '') msg = msg[:msg.find('\n')] raise formencode.Invalid('Invalid ReST', params['content'], None, error_dict={'content': msg}) part.label = params['label'] if params['child_part_id']: for idx, cpid in enumerate(params['child_part_id']): child_part = dbsession.query(Part).filter(and_(Part.id == cpid, Part.parent_id == part.id)).first() if child_part: child_part.order = idx if params['template_id']: for idx, tid in enumerate(params['template_id']): template = dbsession.query(Asset).filter(Asset.id == tid).first() if template: template.order = idx dbsession.add(part) # Send a change-notification e-mail if params['email_notify'] and params['email_notify_text'].strip(): if part.type == 'module': users = part.users elif part.type == 'part': users = part.parent.users elif part.type == 'page': users = part.parent.parent.users for user_role in users: if user_role.user.id != request.current_user.id: send_email(request, user_role.user.email, get_config_setting(request, 'email.sender', default='no-reply@example.com'), 'Update to %s' % part.title, params['email_notify_text']) print(user_role.user.email) raise HTTPSeeOther(request.route_url('part.view', pid=request.matchdict['pid'])) except formencode.Invalid as e: dbsession.add(part) return {'errors': e.error_dict, 'values': request.params, 'part': part, 'crumbs': crumbs, 'help': help_path} return {'part': part, 'crumbs': crumbs, 'help': help_path} else: unauthorised_redirect(request) else: raise HTTPNotFound()
[docs]class RegisterSettingsSchema(CSRFSchema): """The :class:`~wte.views.part.RegisterSettingsSchema` handles the validation for modifying the ``access_rights`` of a :class:`~wte.models.Part`. """ require = formencode.ForEach(formencode.validators.OneOf(['password', 'email_domains'])) """Which access rights are required""" password = formencode.validators.UnicodeString(if_missing=None) """The password to use for access rights""" email_domains = formencode.validators.UnicodeString(if_missing=None) """The email domains that the user must belong to"""
@view_config(route_name='part.register.settings', renderer='wte:templates/part/register_settings.kajiki') @current_user() @require_logged_in()
[docs]def edit_register_settings(request): """Handles the ``/parts/{pid}/register/settings`` URL, providing the UI and backend for editing the settings for registering for a "module" type :class:`~wte.models.Part`. Requires that the user has "edit" rights on the :class:`~wte.models.Part`. """ dbsession = DBSession() part = dbsession.query(Part).filter(Part.id == request.matchdict['pid']).first() if part: if part.allow('edit', request.current_user): if part.type != 'module': request.session.flash('Access rights can only be set on modules', queue='info') raise HTTPSeeOther(request.route_url('part.view', pid=request.matchdict['pid'])) if part.access_rights: rights = json.loads(part.access_rights) else: rights = {} crumbs = create_part_crumbs(request, part, {'title': 'Access Settings', 'url': request.current_route_url()}) if request.method == 'POST': try: params = RegisterSettingsSchema().to_python(request.params, State(request=request)) with transaction.manager: rights = {} if params['require']: if 'password' in params['require'] and params['password']: rights['password'] = params['password'] if 'email_domains' in params['require'] and params['email_domains']: rights['email_domains'] = [ed.strip() for ed in params['email_domains'].split(',')] part.access_rights = json.dumps(rights) dbsession.add(part) dbsession.add(part) raise HTTPSeeOther(request.route_url('part.view', pid=request.matchdict['pid'])) except formencode.Invalid as e: return {'errors': e.error_dict, 'valuse': request.params, 'part': part, 'crumbs': crumbs, 'rights': rights, 'help': ['user', 'teacher', 'module', 'access_settings.html']} return {'part': part, 'crumbs': crumbs, 'rights': rights, 'help': ['user', 'teacher', 'module', 'access_settings.html']} else: unauthorised_redirect(request) else: raise HTTPNotFound()
@view_config(route_name='part.delete', renderer='wte:templates/part/delete.kajiki') @current_user() @require_logged_in()
[docs]def delete(request): """Handles the ``/modules/{mid}/parts/{pid}/delete`` URL, providing the UI and backend for deleting a :class:`~wte.models.Part`. Requires that the user has "delete" rights on the :class:`~wte.models.Part`. """ dbsession = DBSession() part = dbsession.query(Part).filter(Part.id == request.matchdict['pid']).first() if part: if part.allow('delete', request.current_user): crumbs = create_part_crumbs(request, part, {'title': 'Delete', 'url': request.route_url('part.delete', pid=part.id)}) if request.method == 'POST': try: CSRFSchema().to_python(request.params, State(request=request)) parent = part.parent with transaction.manager: dbsession.add(part) for progress in dbsession.query(UserPartProgress).\ filter(UserPartProgress.current_id == part.id): progress.current_id = None dbsession.delete(part) if parent: dbsession.add(parent) raise HTTPSeeOther(request.route_url('part.view', pid=parent.id)) else: dbsession.add(request.current_user) raise HTTPSeeOther(request.route_url('part.list', _query={'user_id': request.current_user.id})) except formencode.Invalid as e: return {'errors': e.error_dict, 'part': part, 'crumbs': crumbs, 'help': ['user', 'teacher', part.type, 'delete.html']} return {'part': part, 'crumbs': crumbs, 'help': ['user', 'teacher', part.type, 'delete.html']} else: unauthorised_redirect(request) else: raise HTTPNotFound()
@view_config(route_name='part.preview', renderer='json') @current_user() @require_logged_in()
[docs]def preview(request): """Handles the ``/parts/{pid}/rst_preview`` URL, generating an HTML preview of the submitted ReST. The ReST text to render has to be set as the ``content`` parameter. In addition to rendering the ReST the in the same way that it is rendered when saving a :class:`~wte.models.Part`, this will also insert a <span id="focus"></span> at the current cursor position indicated by the string "§§§§§§§". Requires that the user has "edit" rights on the current :class:`~wte.models.Part`. """ dbsession = DBSession() part = dbsession.query(Part).filter(Part.id == request.matchdict['pid']).first() if part: if part.allow('edit', request.current_user): if 'content' in request.params: try: content = compile_rst(request.params['content'], request, part=part, line_numbers=True) except Exception as e: content = ['<div class="callout alert"><p>'] for line in e.message.split('\n'): if line: content.append(line) content.append('<br />') else: content.append('</p><p>') content.append('</p></div>') content = ''.join(content).replace('<br /></p>', '</p>').\ replace('<string>:', 'Line ').replace('(SEVERE/4) ', '') return {'content': content} else: raise HTTPNotFound() else: raise HTTPForbidden() else: raise HTTPNotFound()
# Todo: CSRF Protection @view_config(route_name='part.register', renderer='wte:templates/part/register.kajiki') @current_user() @require_logged_in()
[docs]def register(request): """Handles the ``/parts/{pid}/register`` URL, to allow users to register for a :class:`~wte.models.Part` that is a "module". """ dbsession = DBSession() part = dbsession.query(Part).filter(Part.id == request.matchdict['pid']).first() if part: if part.type != 'module': request.session.flash('You can only register for modules', queue='error') raise HTTPSeeOther(request.route_url('part.view', pid=request.matchdict['pid'])) if part.has_role(['student', 'owner'], request.current_user): request.session.flash('You are already registered for this module', queue='info') raise HTTPSeeOther(request.route_url('part.view', pid=request.matchdict['pid'])) if part.status != 'available': request.session.flash('You cannot register for this module as it is %s' % (part.status), queue='auth') raise HTTPSeeOther(request.route_url('part.list')) crumbs = create_part_crumbs(request, part, {'title': 'Register', 'url': request.route_url('part.register', pid=part.id)}) if request.method == 'POST': if part.access_rights: rights = json.loads(part.access_rights) if rights: if 'password' in rights and 'email_domains' in rights: if request.current_user.email[request.current_user.email.find('@') + 1:]\ in rights['email_domains']: if 'password' not in request.params or request.params['password'] != rights['password']: e = formencode.Invalid('Please provide the correct password', None, None, error_dict={'password': 'Please provide the correct password'}) e.params = request.params return {'part': part, 'crumbs': crumbs, 'e': e} else: request.session.flash('''Unfortunately you cannot take this module as your e-mail address is not in the list of authorised e-mail domains.''', queue='auth') raise HTTPSeeOther(request.route_url('part.view', pid=request.matchdict['pid'])) elif 'password' in rights: if 'password' not in request.params or request.params['password'] != rights['password']: return {'part': part, 'crumbs': crumbs, 'invalid_password': True} elif 'email_domains' in rights: if request.current_user.email[request.current_user.email.find('@') + 1:] \ not in rights['email_domains']: request.session.flash('Unfortunately you cannot take this module as your e-mail address ' + 'is not in the list of authorised e-mail domains.''', queue='auth') raise HTTPSeeOther(request.route_url('part.view', pid=request.matchdict['pid'])) with transaction.manager: dbsession.add(UserPartRole(user=request.current_user, part=part, role='student')) raise HTTPSeeOther(request.route_url('part.view', pid=request.matchdict['pid'])) return {'part': part, 'crumbs': crumbs} else: raise HTTPNotFound()
[docs]def get_all_parts(part): """Recursively returns the :class:`~wte.models.Part` and all its children. :param part: The :class:`~wte.models.Part` for which to find it children :type part: :class:`~wte.models.Part` :return: The ``part`` and all its children :rtype: ``list`` """ parts = [part] for child in part.children: parts.extend(get_all_parts(child)) return parts
# Todo: CSRF Protection @view_config(route_name='part.deregister', renderer='wte:templates/part/deregister.kajiki') @current_user() @require_logged_in()
[docs]def deregister(request): """Handles the ``/parts/{pid}/deregister`` URL, to allow users to de-register from a :class:`~wte.models.Part` that is a "module". """ dbsession = DBSession() part = dbsession.query(Part).filter(Part.id == request.matchdict['pid']).first() if part: if not part.has_role('student', request.current_user): request.session.flash('You are not registered for this module', queue='info') raise HTTPSeeOther(request.route_url('part.view', pid=request.matchdict['pid'])) crumbs = create_part_crumbs(request, part, {'title': 'De-Register', 'url': request.route_url('part.deregister', pid=part.id)}) if request.method == 'POST': with transaction.manager: dbsession.add(part) role = dbsession.query(UserPartRole).\ filter(and_(UserPartRole.part_id == request.matchdict['pid'], UserPartRole.user_id == request.current_user.id, UserPartRole.role == 'student')).first() if role: dbsession.delete(role) parts = get_all_parts(part) for child_part in parts: progress = dbsession.query(UserPartProgress).\ filter(and_(UserPartProgress.part_id == child_part.id, UserPartProgress.user_id == request.current_user.id)).first() if progress: dbsession.delete(progress) raise HTTPSeeOther(request.route_url('part.view', pid=request.matchdict['pid'])) return {'part': part, 'crumbs': crumbs} else: raise HTTPNotFound()
[docs]class ChangeStatusSchema(CSRFSchema): """The :class:`~wte.views.part.ChangeStatusSchema` handles the validation for editing :class:`~wte.models.Part`. """ status = formencode.All(formencode.validators.UnicodeString(not_empty=True), formencode.validators.OneOf(['unavailable', 'available', 'archived'])) """The part's status""" return_to = formencode.validators.String(if_missing=None) """Return to this URL"""
@view_config(route_name='part.change_status', renderer='wte:templates/part/change_status.kajiki') @current_user() @require_logged_in()
[docs]def change_status(request): """Handles the ``/parts/{pid}/change_status`` URL, providing the UI and backend for changing the status of a :class:`~wte.models.Part`. Requires that the user has "edit" rights on the :class:`~wte.models.Part`. """ dbsession = DBSession() part = dbsession.query(Part).filter(Part.id == request.matchdict['pid']).first() if part: if part.allow('edit', request.current_user): crumbs = create_part_crumbs(request, part, {'title': 'Change Status', 'url': request.current_route_url()}) if request.method == 'POST': try: params = ChangeStatusSchema().to_python(request.params, State(request=request)) with transaction.manager: dbsession.add(part) part.status = params['status'] dbsession.add(part) if params['return_to']: raise HTTPSeeOther(params['return_to']) else: raise HTTPSeeOther(request.route_url('part.view', pid=request.matchdict['pid'])) except formencode.Invalid as e: return {'errors': e.error_dict, 'values': request.params, 'part': part, 'crumbs': crumbs} return {'part': part, 'crumbs': crumbs} else: unauthorised_redirect(request) else: raise HTTPNotFound()
CROSSREF_PATTERN = re.compile(r':crossref:`(?:([0-9]+)|(?:(.*)<([0-9]+)>))`')
[docs]def crossref_replace(match, id_mapping): """The :func:`~wte.views.part.crossref_replace` function updates a single cross-reference ReST role identified by the ``match`` parameter with the correct new target part identifier from the ``id_mapping``. If the identifier in the current cross-reference is not in the ``id_mapping``, then it is replaced with the string 'external'. :param match: The regexp match :type match: :class:`~re.MatchObject` :param id_mapping: The identifier change mappings :type id_mapping: `dict` :return: The updated cross-reference :return_type: `string` """ groups = match.groups() part_id = groups[0] if groups[0] else groups[2] title = None if groups[0] else groups[1] if part_id in id_mapping: if title: return ':crossref:`%s<%s>`' % (title, id_mapping[part_id]) else: return ':crossref:`%s`' % (id_mapping[part_id]) else: if title: return ':crossref:`%s<external>`' % (title) else: return ':crossref:`external`'
@view_config(route_name='part.export') @current_user() @require_logged_in()
[docs]def export(request): """Handles the ``/parts/{pid}/export`` URL, providing the UI and backend for exporting a :class:`~wte.models.Part`. The difference between exporting and downloading (:func:`~wte.views.part.download`) is that exporting creates an archive that can be imported into another WTE instance, while downloading creates an HTML version for offline viewing. Requires that the user has "edit" rights on the :class:`~wte.models.Part`. """ def part_as_dict(part, assets, id_mapping): data = {'id': len(id_mapping), 'type': part.type, 'display_mode': part.display_mode, 'label': part.label, 'title': part.title, 'status': part.status, 'order': part.order, 'content': part.content} id_mapping[str(part.id)] = data['id'] if part.children: data['children'] = [part_as_dict(child, assets, id_mapping) for child in part.children] if part.all_assets: data['assets'] = [] for asset in part.all_assets: data['assets'].append({'id': len(assets), 'filename': asset.filename, 'mimetype': asset.mimetype, 'type': asset.type, 'order': asset.order}) assets.append((len(assets), asset.id)) return data def fix_references(data, id_mapping): if 'content' in data and data['content']: data['content'] = re.sub(CROSSREF_PATTERN, lambda m: crossref_replace(m, id_mapping), data['content']) if 'children' in data: data['children'] = [fix_references(child, id_mapping) for child in data['children']] return data dbsession = DBSession() part = dbsession.query(Part).filter(Part.id == request.matchdict['pid']).first() if part: if part.allow('edit', request.current_user): crumbs = create_part_crumbs(request, part, {'title': 'Export', 'url': request.current_route_url()}) if request.method == 'POST': body = BytesIO() body_zip = ZipFile(body, 'w') assets = [] id_mapping = {} data = part_as_dict(part, assets, id_mapping) data = fix_references(data, id_mapping) body_zip.writestr('content.json', json.dumps(data), ZIP_DEFLATED) for export_id, asset_id in assets: asset = dbsession.query(Asset).filter(Asset.id == asset_id).first() if asset: if asset.data: if asset.mimetype.startswith('text'): body_zip.writestr('assets/%s' % (export_id), asset.data, ZIP_DEFLATED) else: body_zip.writestr('assets/%s' % (export_id), asset.data, ZIP_STORED) else: body_zip.writestr('assets/%s' % (export_id), '', ZIP_DEFLATED) body_zip.close() return Response(body=body.getvalue(), headers=[('Content-Type', 'application/zip'), ('Content-Disposition', native_str('attachment; filename="%s.zip"' % (part.title.replace('/', '_'))))]) return render_to_response('wte:templates/part/export.kajiki', {'part': part, 'crumbs': crumbs, 'help': ['user', 'teacher', 'import_export.html']}, request=request) else: unauthorised_redirect(request) else: raise HTTPNotFound()
[docs]class ImportPartConverter(formencode.FancyValidator): """The :class:`~wte.views.part.ImportPartConverter` converts a :class:`~cgi.FieldStorage` into a (``dict``, :class:`~zipfile.ZipFile`) ``tuple``. Checks that the uploaded zip file is actually a valid file and that it can be imported with the given parent :class:`~wte.models.Part`. The ``dict`` contains the :class:`~wte.models.Part` data to import. """ messages = {'notzip': 'Only ZIP files can be imported', 'invalidstructure': 'The ZIP file does not have the correct internal structure', 'nopart': 'The ZIP file does not contain any data to import', 'invalidpart': 'You cannot import a %(part_type)s here'} def _convert_to_python(self, value, state): """Convert the submitted ``value`` into a (``dict``, :class:`~zipfile.ZipFile`) ``tuple``. Checks that the uploaded file is a valid ZIP file and contains the "content.json" file. :param value: The uploaded file :type value: `~cgi.FieldStorage` :param state: The state object :return: The converted part import data :rtype: (``dict``, :class:`~zipfile.ZipFile`) ``tuple`` """ try: zip_file = ZipFile(value.file) zip_file.getinfo('content.json') with zip_file.open('content.json') as data: data = data.read() if not IS_PYTHON2: data = data.decode('utf-8') data = json.loads(data) return (data, zip_file) except BadZipfile: raise formencode.api.Invalid(self.message('notzip', state), value, state) except KeyError: zip_file.close() raise formencode.api.Invalid(self.message('invalidstructure', state), value, state) except ValueError: zip_file.close() raise formencode.api.Invalid(self.message('nopart', state), value, state) def _validate_python(self, value, state): """Checks that the uploaded "content.json" file is actually valid. Checks that the JSON data is catually a ``dict`` and that it has the minium fields "title" and "type". Also checks if there is a ``state.parent`` object, whether the :class:`~wte.models.Part` data to import can be imported into the ``state.parent`` :class:`~wte.models.Part`. """ (data, zip_file) = value if not isinstance(data, dict): zip_file.close() raise formencode.api.Invalid(self.message('nopart', state), value, state) if 'title' not in data or 'type' not in data: zip_file.close() raise formencode.api.Invalid(self.message('nopart', state), value, state) if not state.parent and data['type'] != 'module': zip_file.close() raise formencode.api.Invalid(self.message('invalidpart', state, part_type=data['type']), value, state) if state.parent: if state.parent.type == 'module': if data['type'] not in ['tutorial', 'exercise', 'part']: raise formencode.api.Invalid(self.message('invalidpart', state, part_type=data['type']), value, state) elif state.parent.type == 'part': if data['type'] != 'page': raise formencode.api.Invalid(self.message('invalidpart', state, part_type=data['type']), value, state) elif state.parent.type == 'tutorial': # Handle old "page" Part imports if data['type'] != 'page': raise formencode.api.Invalid(self.message('invalidpart', state, part_type=data['type']), value, state) elif state.parent.type == 'exercise': # Handle old "task" Part imports if data['type'] != 'task': raise formencode.api.Invalid(self.message('invalidpart', state, part_type=data['type']), value, state) else: raise formencode.api.Invalid(self.message('invalidpart', state, part_type=data['type']), value, state)
[docs]class ImportPartSchema(CSRFSchema): """The :class:`~wte.views.part.ImportPartSchema` validates the request to import a :class:`~wte.models.Part` (and any children and :class:`~wte.models.Asset`). Uses the :class:`~wte.views.part.ImportPartConverter` to actually check whether the uploaded file is a valid export generated from :func:`~wte.views.part.export`. """ parent_id = formencode.validators.Int(if_missing=None) """The id of the parent :class:`~wte.models.Part` to import into.""" file = formencode.compound.Pipe(formencode.validators.FieldStorageUploadConverter(not_empty=True), ImportPartConverter()) """The file to import."""
@view_config(route_name='part.import', renderer='wte:templates/part/import.kajiki') @current_user() @require_logged_in()
[docs]def import_file(request): """Handles the ``/parts/import`` URL, providing the UI and backend for importing a :class:`~wte.models.Part`. The required permissions depend on the type of :class:`~wte.models.Part` to create: * `module` -- User permission "modules.create" * `tutorial` -- "edit" permission on the parent :class:`~wte.models.Part` * `page` -- "edit" permission on the parent :class:`~wte.models.Part` * `exercise` -- "edit" permission on the parent :class:`~wte.models.Part` * `task` -- "edit" permission on the parent :class:`~wte.models.Part` """ def recursive_import(data, zip_file, id_mapping): part_type = data['type'] if part_type == 'task': part_type = 'page' elif part_type in ['tutorial', 'exercise']: part_type = 'part' part = Part(type=part_type, title=data['title'], status='available' if part_type == 'page' else 'unavailable') if 'id' in data: id_mapping[str(data['id'])] = part if 'order' in data: try: part.order = int(data['order']) except ValueError: part.order = 0 if 'content' in data: part.content = data['content'] if part_type == 'part' and 'display_mode' in data: part.display_mode = data['display_mode'] elif part_type == 'part': part.display_mode = 'three_pane_html' if 'label' in data: part.label = data['label'] else: if data['type'] in ['tutorial', 'exercise', 'page', 'task']: part.label = data['type'].title() if 'assets' in data: for tmpl in data['assets']: if 'filename' in tmpl and 'mimetype' in tmpl and 'id' in tmpl and 'type' in tmpl: try: content = zip_file.open('assets/%i' % (tmpl['id'])) asset = Asset(filename=tmpl['filename'], mimetype=tmpl['mimetype'], data=content.read(), type=tmpl['type']) if 'order' in tmpl: try: asset.order = int(tmpl['order']) except ValueError: asset.order = 0 part.all_assets.append(asset) except: pass if 'children' in data: for child in data['children']: child_part = recursive_import(child, zip_file, id_mapping) part.children.append(child_part) child_part.parent = part return part def fix_references(part, dbsession, id_mapping): dbsession.add(part) if part.content: part.content = re.sub(CROSSREF_PATTERN, lambda m: crossref_replace(m, id_mapping), part.content) part.compiled_content = compile_rst(part.content, request, part=part) if part.children: for child in part.children: fix_references(child, dbsession, id_mapping) dbsession = DBSession() parent = dbsession.query(Part).\ filter(Part.id == request.params['parent_id']).first()\ if 'parent_id' in request.params else None if parent and not parent.allow('edit', request.current_user): unauthorised_redirect(request) elif not parent and not request.current_user.has_permission('modules.create'): unauthorised_redirect(request) crumbs = create_part_crumbs(request, parent, {'title': 'Import', 'url': request.current_route_url()}) if request.method == 'POST': try: params = ImportPartSchema().to_python(request.params, State(parent=parent, request=request)) with transaction.manager: if parent: dbsession.add(parent) id_mapping = {} part = recursive_import(params['file'][0], params['file'][1], id_mapping) dbsession.add(part) params['file'][1].close() if part.type == 'module': part.users.append(UserPartRole(user=request.current_user, role='owner')) if parent: parent.children.append(part) with transaction.manager: for key, value in list(id_mapping.items()): dbsession.add(value) id_mapping[key] = value.id fix_references(part, dbsession, id_mapping) dbsession.add(part) raise HTTPSeeOther(request.route_url('part.view', pid=part.id)) except formencode.Invalid as e: return {'errors': e.error_dict, 'valuse': request.params, 'crumbs': crumbs, 'help': ['user', 'teacher', 'import_export.html']} return {'crumbs': crumbs, 'help': ['user', 'teacher', 'import_export.html']}
# Todo: Add a download launch page for non-JS users @view_config(route_name='part.download') @require_method('POST') @current_user() @require_logged_in()
[docs]def download(request): """Handles the ``/parts/{pid}/download`` URL, providing the UI and backend for downloading a :class:`~wte.models.Part`. The difference between exporting (:func:`~wte.views.part.export`) and downloading is that exporting creates an archive that can be imported into another WTE instance, while downloading creates an HTML version for offline viewing. """ def download_part(base_path, part, body_zip, parents=None): response = render_to_response('wte:templates/part/download.kajiki', {'part': part, 'parents': parents}, request=request) body_zip.writestr('%s/%s.html' % (base_path, part.id), response.body) for child in part.children: if child.allow('view', request.current_user): download_part(base_path, child, body_zip, parents + [part] if parents else [part]) for asset in part.assets: body_zip.writestr('%s/%s/assets/%s' % (base_path, part.id, asset.filename), asset.data if asset.data else '') for template in part.templates: template_data = template.data if template.data else native_str('') body_zip.writestr('%s/%s/%s' % (base_path, part.id, template.filename), template_data) dbsession = DBSession() part = dbsession.query(Part).filter(Part.id == request.matchdict['pid']).first() if part: if part.allow('view', request.current_user): body = BytesIO() body_zip = ZipFile(body, 'w') for target, source in [('%s/_static/application.min.css', 'static/css/application.min.css'), ('%s/_static/foundation-icons.eot', 'static/css/foundation-icons.eot'), ('%s/_static/foundation-icons.svg', 'static/css/foundation-icons.svg'), ('%s/_static/foundation-icons.ttf', 'static/css/foundation-icons.ttf'), ('%s/_static/foundation-icons.woff', 'static/css/foundation-icons.woff'), ('%s/_static/libraries.min.js', 'static/js/libraries.min.js')]: body_zip.writestr(target % part.title.replace('/', '_'), resource_string('wte', source)) index_html = '''<!DOCTYPE html> <html> <head> <title>%(title)s</title> <script> window.location.href = '%(url)s'; </script> <meta http-equiv="refresh" content="1; url=%(url)s"> </head> <body> <p>The main content can be found <a href="%(url)s">here</a></p> </body> </html>''' % {'title': part.title, 'url': '%s.html' % part.id} if IS_PYTHON2: index_html = index_html.encode('utf-8') body_zip.writestr('%s/index.html' % (part.title.replace('/', '_')), index_html) download_part(part.title.replace('/', '_'), part, body_zip) body_zip.close() return Response(body=body.getvalue(), headers=[('Content-Type', 'application/zip'), ('Content-Disposition', native_str('attachment; filename="%s.zip"' % (part.title.replace('/', '_'))))]) else: unauthorised_redirect(request) else: raise HTTPNotFound()
[docs]class ResetFilesSchema(CSRFSchema): """Validator for resetting all the files in the user's :class:`~wte.models.UserPartProgress` to their initial values.""" filename = formencode.validators.UnicodeString(if_empty=None, if_missing=None) """The optional filename to reset."""
@view_config(route_name='part.reset-files', renderer='wte:templates/part/reset_files.kajiki') @current_user() @require_logged_in()
[docs]def reset_files(request): """Handles the ``/parts/{pid}/reset_files`` URL, providing the UI and backend for resetting all :class:`~wte.models.Assets` of a :class:`~wte.models.Part` to the default for the current user. Requires that the user has "view" rights on the current :class:`~wte.models.Part`. """ dbsession = DBSession() part = dbsession.query(Part).filter(Part.id == request.matchdict['pid']).first() if part: if part.allow('view', request.current_user): crumbs = create_part_crumbs(request, part, {'title': 'Discard all Changes', 'url': request.current_route_url()}) if request.method == 'POST': try: params = ResetFilesSchema().to_python(request.params, State(request=request)) with transaction.manager: progress = get_user_part_progress(dbsession, request.current_user, part) for user_file in list(progress.files): if not params['filename'] or params['filename'] == user_file.filename: progress.files.remove(user_file) dbsession.delete(user_file) raise HTTPSeeOther(request.route_url('part.view', pid=request.matchdict['pid'])) except formencode.Invalid as e: return {'errors': e.error_dict, 'part': part, 'crumbs': crumbs} return {'part': part, 'crumbs': crumbs} else: unauthorised_redirect(request) else: raise HTTPNotFound()
@view_config(route_name='part.progress.download') @current_user() @require_logged_in()
[docs]def download_part_progress(request): """Handles the ``/users/{uid}/progress/{pid}/download`` URL, sending back the complete set of data associated with the :class:`~wte.models.UserPartProgress`. Requires that the user has "view" rights on the :class:`~wte.models.UserPartProgress`. """ dbsession = DBSession() part = dbsession.query(Part).filter(Part.id == request.matchdict['pid']).first() if part: if part.allow('view', request.current_user): progress = get_user_part_progress(dbsession, request.current_user, part) basepath = progress.part.title filename = progress.part.title parent = progress.part.parent while parent: basepath = '%s/%s' % (parent.title, basepath) filename = '%s - %s' % (parent.title, filename) parent = parent.parent basepath = '%s/' % (basepath) filename = '%s.zip' % (filename) body = BytesIO() zipfile = ZipFile(body, mode='w') for user_file in progress.files: zipfile.writestr('%s%s' % (basepath, user_file.filename), user_file.data) for asset in progress.part.assets: zipfile.writestr('%s/assets/%s' % (basepath, asset.filename), asset.data) zipfile.close() return Response(body=body.getvalue(), headerlist=[('Content-Type', 'application/zip'), ('Content-Disposition', 'attachment; filename="%s"' % (filename.replace('/', '_')))]) else: unauthorised_redirect(request) else: raise HTTPNotFound()
@view_config(route_name='part.progress.update', renderer='json') @current_user() @require_logged_in()
[docs]def update_part_progress(request): """Handles the ``/users/{uid}/progress/{pid}/download`` URL, sending back the complete set of data associated with the :class:`~wte.models.UserPartProgress`. Requires that the user has "view" rights on the :class:`~wte.models.UserPartProgress`. """ dbsession = DBSession() part = dbsession.query(Part).filter(Part.id == request.matchdict['pid']).first() request.response.cache_control = 'no-cache' if part: if part.allow('view', request.current_user): progress = get_user_part_progress(dbsession, request.current_user, part) if 'duration' in request.params: with transaction.manager: dbsession.add(progress) dbsession.add(part) progress.visited[str(part.id)]['duration'] = progress.visited[str(part.id)]['duration'] +\ int(int(request.params['duration']) / 1000) return {} else: unauthorised_redirect(request) else: raise HTTPNotFound()
@view_config(route_name='part.search', renderer='json') @current_user() @require_logged_in()
[docs]def search(request): """Handles the ``/parts/search`` URL, searching all :class:`~wte.models.Part` for matches to the request parameter 'q'. Returns all :class:`~wte.models.Part` that the current user is allowed to view. """ dbsession = DBSession() parts = [] if 'q' in request.params and request.params['q']: for part in dbsession.query(Part).filter(Part.title.like('%%%s%%' % request.params['q'])): if part.allow('view', request.current_user): parts.append({'id': part.id, 'value': part.title}) return parts