#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # main.py # # Copyright 2014 Fanir # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. # # # SETTINGS CAN BE FOUND AT THE BOTTOM OF THE FILE import sys, string, socket, re, signal from random import choice from time import sleep from select import poll, POLLIN, POLLPRI, POLLOUT,\ POLLERR, POLLHUP, POLLNVAL from threading import Thread, Event bot = None class log(): DEBUG, INFO, WARNING, ERROR, FATAL, SILENT =\ 0, 1, 2, 3, 4, 5 levels = { 0: "[DEBUG] ", 1: "[INFO] ", 2: "[WARNING] ", 3: "[ERROR] ", 4: "[FATAL] ", 5: "[SILENT] " } def show(level, msg): if level in range(log.DEBUG, log.SILENT): if LOGLEVEL <= level: print("".join((log.levels[level], msg))) else: raise ValueError("That's not a loglevel!") class pircbot(): def __init__(self, nicknames, server, port=6667, ident="pircbot", realname="pircbot", serverpasswd="", encodings=("utf-8", "latin-1"), users="", query_type="", command_prefix=".", parser_wait_time=0.1): self.server = server self.port = port self.serverpasswd = serverpasswd self.encodings = encodings self.nicknames = nicknames self.ident = ident self.realname = realname self.users = users self.cmdprefix = command_prefix self.query_type = query_type.upper() self.parser_wait_time=parser_wait_time self.user = {} self.socket = None self.recvbuffer = bytearray(1024) self.recvloop = None self.parseloop = None self.die_event = Event() self.ready = False self.mode_reply = {} def connect(self): # connect self.socket = socket.socket() log.show(log.DEBUG, "--- SOCKET OPENING ---") try: self.socket.connect((self.server, self.port)) except socket.error as e: log.show(log.FATAL, "Fehler: %s" % e) return # start getting data self.recvloop = Thread(target=self.recv, name="recvloop") self.recvloop.start() # optionally send a server password if self.serverpasswd != "": self.send("PASS %s" % self.serverpasswd) # get a nick self.send("NICK %s" % self.nicknames.pop(0)) # set user data self.send("USER %s 0 * :%s" % (self.ident, self.realname)) # implements irc command QUIT and (more or less) clean exiting def disconnect(self, reason="", send_quit=True): if send_quit: try: self.send("QUIT :%s" % reason) sleep(1) except: pass self.die_event.set() ctr = 0 while self.recvloop.is_alive() and self.parseloop.is_alive() and ctr < 15: ctr += 1 sleep(1) log.show(log.DEBUG, "--- SOCKET CLOSING ---") try: self.socket.shutdown(socket.SHUT_RDWR) self.socket.close() except: pass ### threaded functions ### # loop for recieving data from irc def recv(self): """ Loop for reciving data """ log.show(log.DEBUG, "--- RECVLOOP STARTING ---") self.parseloop = Thread(target=self.parser, name="parser") p = poll() p.register(self.socket.fileno(), POLLIN) while not self.die_event.is_set(): ap = p.poll(1000) if (self.socket.fileno(), POLLIN) in ap: self.recvbuffer.extend(self.socket.recv(1024)) if not self.parseloop.is_alive(): self.parseloop = Thread(target=self.parser, name="parser") self.parseloop.start() log.show(log.DEBUG, "--- RECVLOOP EXITING ---") # loop for parsing incoming data def parser(self): """ Loop for parsing incoming data """ log.show(log.DEBUG, "--- PARSELOOP STARTING ---") while not self.die_event.is_set():# and self.recvbuffer.endswith(b"\r\n"):# != b"": if self.recvbuffer.endswith(b"\r\n"): # get and decode line from buffer rawline, _, self.recvbuffer = self.recvbuffer.partition(b"\r\n") rawline = self.decode(rawline) # prepare line line = rawline.split(" :", 1) head = line[0].lstrip("\x00").split(" ") larg = line[1] if len(line)>1 else "" # parse prefix origin = {} if head[0].startswith(":"): prefix = head.pop(0)[1:] if "@" in prefix: origin["nick"], origin["ident"], origin["host"] = re.match(r"(\S+)!(\S+)@(\S+)", prefix).groups() origin["mask"] = prefix else: origin["server"] = prefix else: prefix = "" # parse command command = head.pop(0) # parse params params = head params.append(larg.strip()) log.show(log.DEBUG, " > %s" % rawline) # PING if command == "PING": self.send("PONG %s" % params[0]) # PRIVMSG and NOTICE elif command == "PRIVMSG" or command == "NOTICE": if params[1][0] == self.cmdprefix: args = [] for v in params[1][1:].split(" "): if v!="": args.append(v) rp = self.on_command(command, prefix, origin, params[0], args[0].lower(), args[1:]) if rp not in (None, ""): self.reply(origin, params[0], rp, command) # 221 (RPL_UMODEIS) elif command == "221" and self.mode_reply is not {}: self.query(self.mode_reply["to"], "Modes are %s" % params[1], self.mode_reply["type"]) self.mode_reply = {} # INVITE elif command == "INVITE": if self.check_privileges(origin["mask"], command): self.join(params[1]) else: self.query(origin["nick"], "You can not force me to do that!", "NOTICE") # 001 (RPL_WELCOME) elif command == "001": self.user["mask"] = re.search(r" (\S+!\S+@\S+)$", rawline.split(" :", 1)[1]).groups()[0] self.user["nick"], self.user["ident"], self.user["host"] = re.match(r"(\S+)!(\S+)@(\S+)", self.user["mask"]).groups() self.ready = True # 433 (ERR_NICKNAMEINUSE) elif command == "433": self.send("NICK %s" % self.nicknames.pop(0)) # KILL elif command == "KILL": log.show(log.WARNING, "Got killed by %s: %s" % (params[0], params[1:])) self.disconnect(send_quit=False) else: sleep(self.parser_wait_time) log.show(log.DEBUG, "--- PARSELOOP EXITING ---") ### helper functions ### # encodes data for irc def encode(self, textstring): for codec in self.encodings: try: return textstring.encode(codec) except UnicodeDecodeError: continue return textstring.encode(self.encodings[0], 'ignore') # decodes data from irc def decode(self, textstring): for codec in self.encodings: try: return textstring.decode(codec) except UnicodeDecodeError: continue return textstring.decode(self.encodings[0], 'ignore') # checks if a given user may execute a given command def check_privileges(self, usermask, command): for user, privs in self.users.items(): if re.search(user, usermask, re.IGNORECASE) != None: if command.lower() in privs or "*" in privs: return True return False # checks if s is a valid channel name def is_channel(self, s): return (True if s[0] in ('&', '#', '!') else False) def exec_on_ready(self, command, retry_times=-1, interval=1): if command.startswith("self."): if retry_times == -1: while not self.ready: sleep(interval) else: cnt = 0 while not self.ready and cnt elif command == "join": if len(params)>0: if self.check_privileges(origin["mask"], command): self.join(params[0]) else: self.query(origin["nick"], "You cannot do that!", in_query_type) # part elif command == "part": if len(params)>0: if self.check_privileges(origin["mask"], command): self.part(params[0]) else: self.query(origin["nick"], "You cannot do that!", in_query_type) # mode ± elif command == "mode": if self.check_privileges(origin["mask"], command): if len(params)==0: self.mode_reply["to"] = origin["nick"] self.mode_reply["type"] = in_query_type self.get_modes() else: self.set_mode(" ".join(params) if len(params)>0 else params) else: self.query(origin["nick"], "You cannot do that!", in_query_type) # die [] elif command == "die": if self.check_privileges(origin["mask"], command): self.disconnect("".join(params) if len(params)>0 else "".join((origin["nick"], " shot me, dying now... Bye..."))) else: self.query(origin["nick"], "Go die yourself!", in_query_type) else: replies = [ ("What? \"%s\" is not a command!", 15), ("%s? What's that?", 3), ("Sorry, I don't know how to %s...", 1) ] self.query(origin["nick"], choice([val for val, cnt in replies for i in range(cnt)]) % (command), in_query_type) def parseargs(): import argparse p = argparse.ArgumentParser( description = "i think my desc is missing") p.add_argument("action", default="help", choices = ["start", "stop"], help="What to do?") #p.add_argument("--daemon", "-d", type = bool, choices = [1, 0], default=1, help="Daemonize, Default: 1") return p.parse_args() def main(): global bot args = parseargs() if args.action == "start": try: bot = pircbot(nicknames=NICKNAMES, ident=IDENT, realname=REALNAME, server=SERVER, port=PORT, serverpasswd=SERVERPASSWD, encodings=ENCODINGS, users=USERS, query_type=QUERY_TYPE, command_prefix=COMMAND_PREFIX, parser_wait_time=PARSER_WAIT_TIME) bot.connect() # wait for the bot to become ready while bot.ready and not bot.die_event.is_set() == False: sleep(1) if not bot.die_event.is_set(): # set modes and join channels bot.set_mode(MODES) for channel in CHANNELS: bot.join(channel) # while bot is active, do nothing while not bot.die_event.is_set(): sleep(1) except KeyboardInterrupt: log.show(log.INFO, "Got Ctrl-C, dying now...") bot.disconnect("Ouch! Got shot by Ctrl-C, dying now... See you!") elif args.action == "stop": print("nope!") log.show(log.DEBUG, "--- MAIN EXITING ---") return 0 if __name__ == '__main__': ################ ### SETTINGS ### ################ # Also known as username. Some IRC-internal. # By default, the nickname will be used as ident. IDENT = "chalkbot" # The list of nicknames to try. If the first one is not aviable, it will # try the second, and so on... # You should specfy at least two nicks. NICKNAMES = ["chalkbot", "chalkbot_", "chalkbot__"] REALNAME = "A ChalkBot Instance" # Command for registering with nickserv, without leading slash. NICKSERVCMD = "" MODES = "+iB" # The Server name to connect to. Duh! SERVER = "fanir.de" #SERVER = "localhost" # "Default" is 6667. An often used port for SSL would be 6697, if SSL would be # supported. Maybe in a future, far far away... PORT = 6667 # Serverpassword. Empty in most cases. SERVERPASSWD = "" # A comma-seperated list of channels to join, enclosed by braces. CHANNELS = ["#bots"] # The encodings to try when getting messages from IRC. Will be tried in the given order. # The first one is used to encode data when sending stuff. # The list given shoud do just fine in most networks, I assume. # Also comma seperated and enclosed by braces. ENCODINGS = ['utf-8', 'latin-1', 'iso-8859-1', 'cp1252'] # List of users (hostmasks as regex) and the commands they are allowed to execute. # "*" Can be used instead of commands to allow every command. # All command names should be lowercase. USERS = { "Fanir\!fanir@.*": ["*"], "\!@some weird user that cannot exist": ["join", "part", "invite"], } # The prefix for commands for the bot. COMMAND_PREFIX = "." # Which way should be used to speak to users? # "" means the type of the incoming message should be used. # One of: "NOTICE", "PRIVMSG", "" QUERY_TYPE = "" # With how much information do you want to be annoyed? # DEBUG spams most, FATAL least. WARNING should be a good tradeoff. # One of: log.DEBUG, log.INFO, log.WARNING, log.ERROR, log.FATAL LOGLEVEL = log.DEBUG # The time the parser for incoming data should wait between each attempt to read new data in seconds. # High values will certainly make the bot reply slowly while very low values increads cpu load and therefore will perform badly on slow machines. # You should keep it between 1 and 0.001 seconds. # For gods sake, don't set it to 0! PARSER_WAIT_TIME = 0.05 ####################### ### END OF SETTINGS ### ####################### main()