#!/usr/bin/env python3 """ SPORE COMMUNITY HUB Minimal, anonymous, self-cleaning community board for tiny systems Designed to run on router hardware with minimal resources Enhanced with audio player, photo viewer, and Kiwix ZIM support """ import os import json import time import hashlib import signal from datetime import datetime, timedelta from flask import Flask, render_template_string, request, send_file, jsonify, Response import mimetypes import subprocess import re app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024 # 10MB max file size # Minimal filesystem-based storage BASE_DIR = os.path.dirname(os.path.abspath(__file__)) DATA_DIR = os.path.join(BASE_DIR, 'spore_data') NOTICES_DIR = os.path.join(DATA_DIR, 'notices') PHOTOS_DIR = os.path.join(DATA_DIR, 'photos') MUSIC_DIR = os.path.join(DATA_DIR, 'music') ZIM_DIR = os.path.join(DATA_DIR, 'zim') VOTES_FILE = os.path.join(DATA_DIR, 'votes.json') ZIM_STATE_FILE = os.path.join(DATA_DIR, '.zim_state') # Create directories for d in [DATA_DIR, NOTICES_DIR, PHOTOS_DIR, MUSIC_DIR, ZIM_DIR]: os.makedirs(d, exist_ok=True) # Initialize votes file if not os.path.exists(VOTES_FILE): with open(VOTES_FILE, 'w') as f: json.dump({}, f) # Check if kiwix-serve is available KIWIX_AVAILABLE = subprocess.run(['which', 'kiwix-serve'], capture_output=True).returncode == 0 KIWIX_PORT = 8081 # Port for kiwix-serve KIWIX_PROCESS = None # Main page items limit MAIN_PAGE_LIMIT = 5 # Minimal HTML template - single string, no external files needed HTML_TEMPLATE = ''' 🌱 Spore Hub

🌱 SPORE COMMUNITY HUB 🌱

anonymous / ephemeral / free

📢 Community Notices

{% for notice in notices %}
{{ notice.content }}
expires: {{ notice.expires }}
{% endfor %}

📸 Photo Gallery

Vote on photos | -5 score = deletion | ⚠️ = report abuse

{% for photo in photos[:main_limit] %}
community photo {{ photo.votes }}
{% endfor %}
📸 View all {{ total_photos }} photos →

🎵 Music Shares

Vote on tracks | -5 score = deletion | ⚠️ = report abuse

{% for track in music[:main_limit] %}
🎵 {{ track.filename }}
{{ track.votes }}
{% endfor %}
🎵 View all {{ total_music }} tracks →

📚 Knowledge Library (ZIM Files)

{% if kiwix_available %} {% if zim_files %}
📖 Kiwix server is running! Access the library at: http://{{ request.host.split(':')[0] }}:{{ kiwix_port }}

Available ZIM archives:

{% for zim in zim_files[:main_limit] %}
📕 {{ zim }}
{% endfor %} 📚 View all {{ total_zim }} ZIM archives → {% else %}

No ZIM files found. Place .zim files in the zim directory to make them available.

{% endif %} {% else %}

⚠️ Kiwix-serve not found. Install kiwix-tools to enable offline Wikipedia/knowledge access.

Ubuntu/Debian: sudo apt install kiwix-tools

Or download from: kiwix.org

{% endif %}

⚠️ Report Abusive Content

This will immediately remove the content.

Only use this for content that is:

Think carefully before bypassing democracy.

The community voting system should handle most content moderation.

''' # Directory listing template BROWSE_TEMPLATE = ''' 🌱 Browse {{ category|title }} ← Back to Hub

📁 Browse {{ category|title }}

{% if category == 'photos' %}
{% for photo in items %}
photo {{ photo.votes }}
{% endfor %}
{% elif category == 'music' %}
{% for track in items %}
🎵 {{ track.filename }}
{{ track.votes }}
{% endfor %}
{% elif category == 'zim' %}
{% for zim in items %}
📕 {{ zim }}
{% endfor %}
{% endif %} ''' def get_audio_type(filename): """Get MIME type for audio file""" ext = filename.lower().rsplit('.', 1)[-1] types = { 'mp3': 'audio/mpeg', 'ogg': 'audio/ogg', 'wav': 'audio/wav', 'm4a': 'audio/mp4' } return types.get(ext, 'audio/mpeg') def clean_expired(): """Remove expired notices""" now = time.time() for f in os.listdir(NOTICES_DIR): fpath = os.path.join(NOTICES_DIR, f) try: with open(fpath, 'r') as file: data = json.load(file) if data['expires'] < now: os.remove(fpath) except: pass def get_votes(): """Get current votes""" try: with open(VOTES_FILE, 'r') as f: return json.load(f) except: return {} def save_votes(votes): """Save votes to file""" with open(VOTES_FILE, 'w') as f: json.dump(votes, f) def get_zim_state(): """Get hash of current ZIM directory state""" zim_files = sorted([f for f in os.listdir(ZIM_DIR) if f.endswith('.zim')]) state = json.dumps(zim_files) return hashlib.md5(state.encode()).hexdigest() def save_zim_state(state): """Save ZIM state""" with open(ZIM_STATE_FILE, 'w') as f: f.write(state) def get_saved_zim_state(): """Get saved ZIM state""" try: with open(ZIM_STATE_FILE, 'r') as f: return f.read().strip() except: return None def stop_kiwix_server(): """Stop kiwix-serve process""" global KIWIX_PROCESS if KIWIX_PROCESS: try: KIWIX_PROCESS.terminate() KIWIX_PROCESS.wait(timeout=5) except: KIWIX_PROCESS.kill() KIWIX_PROCESS = None # Also kill any orphaned kiwix-serve processes subprocess.run(['pkill', '-f', 'kiwix-serve'], capture_output=True) def start_kiwix_server(): """Start kiwix-serve if available and ZIM files exist""" global KIWIX_PROCESS if not KIWIX_AVAILABLE: return zim_files = [f for f in os.listdir(ZIM_DIR) if f.endswith('.zim')] if not zim_files: return # Check if ZIM directory has changed current_state = get_zim_state() saved_state = get_saved_zim_state() if saved_state != current_state: print("ZIM directory changed, restarting kiwix-serve...") stop_kiwix_server() save_zim_state(current_state) # Check if already running if KIWIX_PROCESS and KIWIX_PROCESS.poll() is None: return # Start kiwix-serve zim_paths = [os.path.join(ZIM_DIR, f) for f in zim_files] cmd = ['kiwix-serve', '--port', str(KIWIX_PORT)] + zim_paths KIWIX_PROCESS = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) print(f"Started kiwix-serve on port {KIWIX_PORT} with {len(zim_files)} ZIM files") @app.route('/') def index(): clean_expired() start_kiwix_server() # Check and restart if needed # Get notices notices = [] for f in sorted(os.listdir(NOTICES_DIR), reverse=True)[:20]: # Last 20 notices try: with open(os.path.join(NOTICES_DIR, f), 'r') as file: data = json.load(file) data['expires'] = datetime.fromtimestamp(data['expires']).strftime('%Y-%m-%d %H:%M') notices.append(data) except: pass # Get photos with vote counts votes = get_votes() photos = [] for f in os.listdir(PHOTOS_DIR): if f.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')): photos.append({ 'filename': f, 'votes': votes.get(f'photo_{f}', 0) }) # Get music files with vote counts music = [] for f in os.listdir(MUSIC_DIR): if f.lower().endswith(('.mp3', '.ogg', '.wav', '.m4a')): music.append({ 'filename': f, 'votes': votes.get(f'music_{f}', 0) }) # Get ZIM files zim_files = [f for f in os.listdir(ZIM_DIR) if f.endswith('.zim')] return render_template_string(HTML_TEMPLATE, notices=notices, photos=photos, total_photos=len(photos), music=music, total_music=len(music), zim_files=zim_files, total_zim=len(zim_files), kiwix_available=KIWIX_AVAILABLE, kiwix_port=KIWIX_PORT, main_limit=MAIN_PAGE_LIMIT, get_audio_type=get_audio_type) @app.route('/browse/') def browse(category): """Browse all items in a category""" votes = get_votes() if category == 'photos': items = [] for f in sorted(os.listdir(PHOTOS_DIR)): if f.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')): items.append({ 'filename': f, 'votes': votes.get(f'photo_{f}', 0) }) elif category == 'music': items = [] for f in sorted(os.listdir(MUSIC_DIR)): if f.lower().endswith(('.mp3', '.ogg', '.wav', '.m4a')): items.append({ 'filename': f, 'votes': votes.get(f'music_{f}', 0) }) elif category == 'zim': items = [f for f in sorted(os.listdir(ZIM_DIR)) if f.endswith('.zim')] # Get file sizes zim_sizes = {} for f in items: size = os.path.getsize(os.path.join(ZIM_DIR, f)) / (1024 * 1024) # MB zim_sizes[f] = size return render_template_string(BROWSE_TEMPLATE, category=category, items=items, zim_sizes=zim_sizes, kiwix_port=KIWIX_PORT, get_audio_type=get_audio_type) else: return "Invalid category", 404 return render_template_string(BROWSE_TEMPLATE, category=category, items=items, kiwix_port=KIWIX_PORT, get_audio_type=get_audio_type) @app.route('/post_notice', methods=['POST']) def post_notice(): content = request.form.get('content', '').strip()[:500] expire_days = min(int(request.form.get('expire_days', 7)), 30) if content: notice_id = hashlib.md5(f"{content}{time.time()}".encode()).hexdigest()[:8] notice_data = { 'content': content, 'posted': time.time(), 'expires': time.time() + (expire_days * 86400) } with open(os.path.join(NOTICES_DIR, f"{notice_id}.json"), 'w') as f: json.dump(notice_data, f) return 'Posted! Back' @app.route('/upload_photo', methods=['POST']) def upload_photo(): if 'photo' in request.files: photo = request.files['photo'] if photo.filename: # Simple filename sanitization ext = photo.filename.rsplit('.', 1)[1].lower() if ext in ['jpg', 'jpeg', 'png', 'gif']: filename = f"{int(time.time())}_{hashlib.md5(photo.filename.encode()).hexdigest()[:8]}.{ext}" photo.save(os.path.join(PHOTOS_DIR, filename)) return 'Uploaded! Back' @app.route('/upload_music', methods=['POST']) def upload_music(): if 'music' in request.files: music = request.files['music'] if music.filename: # Simple filename sanitization ext = music.filename.rsplit('.', 1)[1].lower() if ext in ['mp3', 'ogg', 'wav', 'm4a']: filename = f"{int(time.time())}_{hashlib.md5(music.filename.encode()).hexdigest()[:8]}.{ext}" music.save(os.path.join(MUSIC_DIR, filename)) return 'Uploaded! Back' @app.route('/photo/') def serve_photo(filename): # Basic path traversal protection if '..' in filename or '/' in filename: return "Nice try", 403 return send_file(os.path.join(PHOTOS_DIR, filename)) @app.route('/music/') def serve_music(filename): # Basic path traversal protection if '..' in filename or '/' in filename: return "Nice try", 403 filepath = os.path.join(MUSIC_DIR, filename) # Support range requests for better audio streaming range_header = request.headers.get('range', None) if not range_header: return send_file(filepath) size = os.path.getsize(filepath) byte1, byte2 = 0, None m = re.search('(\d+)-(\d*)', range_header) g = m.groups() if g[0]: byte1 = int(g[0]) if g[1]: byte2 = int(g[1]) length = size - byte1 if byte2 is not None: length = byte2 - byte1 + 1 data = None with open(filepath, 'rb') as f: f.seek(byte1) data = f.read(length) rv = Response(data, 206, mimetype=get_audio_type(filename), direct_passthrough=True) rv.headers.add('Content-Range', f'bytes {byte1}-{byte1 + length - 1}/{size}') rv.headers.add('Accept-Ranges', 'bytes') return rv @app.route('/vote///', methods=['POST']) def vote(type, filename, direction): # Basic path traversal protection if '..' in filename or '/' in filename: return jsonify({'error': 'invalid'}), 403 # Convert direction to vote value if direction == 'up': value = 1 elif direction == 'down': value = -1 else: return jsonify({'error': 'invalid vote direction'}), 400 votes = get_votes() vote_key = f'{type}_{filename}' current_votes = votes.get(vote_key, 0) new_votes = current_votes + value votes[vote_key] = new_votes # Check if file should be deleted (score <= -5) if new_votes <= -5: try: if type == 'photo': os.remove(os.path.join(PHOTOS_DIR, filename)) elif type == 'music': os.remove(os.path.join(MUSIC_DIR, filename)) else: return jsonify({'error': 'invalid type'}), 400 del votes[vote_key] save_votes(votes) return jsonify({'status': 'deleted'}) except: pass save_votes(votes) return jsonify({'votes': new_votes}) @app.route('/abuse//', methods=['POST']) def report_abuse(type, filename): # Basic path traversal protection if '..' in filename or '/' in filename: return jsonify({'error': 'invalid'}), 403 # Immediately delete the file try: if type == 'photo': os.remove(os.path.join(PHOTOS_DIR, filename)) elif type == 'music': os.remove(os.path.join(MUSIC_DIR, filename)) else: return jsonify({'error': 'invalid type'}), 400 # Remove any votes for this file votes = get_votes() vote_key = f'{type}_{filename}' if vote_key in votes: del votes[vote_key] save_votes(votes) return jsonify({'status': 'deleted'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/zim/') def serve_zim(filename): # Basic path traversal protection if '..' in filename or '/' in filename: return "Nice try", 403 return send_file(os.path.join(ZIM_DIR, filename), as_attachment=True) def cleanup(): """Cleanup function to stop kiwix-serve on exit""" stop_kiwix_server() def signal_handler(signum, frame): """Handle shutdown signals""" print("\nShutting down Spore Hub...") cleanup() sys.exit(0) if __name__ == '__main__': import atexit import sys # Register cleanup atexit.register(cleanup) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # Start kiwix server if available start_kiwix_server() # For production on tiny hardware: # - Use lighttpd + flup or uwsgi # - Or run with gunicorn using 1-2 workers # - Disable debug mode! # Example lighttpd config: # server.modules += ( "mod_fastcgi" ) # fastcgi.server = ( # "/spore" => (( # "socket" => "/tmp/spore.sock", # "bin-path" => "/path/to/spore_app.py", # "max-procs" => 1 # )) # ) app.run(host='0.0.0.0', port=8080, debug=False)