19
loading...
This website collects cookies to deliver better user experience
user: guess
bot: From what number?
user:: 25
bot: To what number?
user: 100
bot: Guess a number between 25 to 100
user: 64
bot: too small
user: 91
bot: too large
......
user: 83
bot: Correct! You spent 6 times to guess this number.
def guess(self):
'''Game function'''
min_value = self.ask_number('From what number?')
max_value = self.ask_number('To what number?')
secret = randint(min_value, max_value)
msg = f'Guess a number between {min_value} to {max_value}'
counter = 0
while True:
counter += 1
answer = self.ask_number(msg)
if answer > secret:
msg = 'Too large'
elif answer < secret:
msg = 'Too small'
else:
break
self.reply(f'You spent {counter} times to guess the secret number.')
git clone https://github.com/lancatlin/python-chatbot-context.git
cd python-chatbot-context
pipenv install
pipenv shell
LINE_TOKEN=YOUR_TOKEN
LINE_SECRET=YOUR_SECRET
python manage.py migrate # for first execution
python manage.py runserver
MessageQueue
class:# guess/message_queue.py
import queue
from threading import RLock
from .line import get_room
class RequestTimout(Exception):
pass
class MessageQueue:
__lock = RLock()
__requests = {}
__responses = {}
@classmethod
def create_if_not_exists(cls, room):
'''Create the requests and responses queues for the room if not exists'''
with cls.__lock:
if room not in cls.__requests:
cls.__requests[room] = queue.Queue(maxsize=1)
if room not in cls.__responses:
cls.__responses[room] = queue.Queue(maxsize=1)
@classmethod
def handle(cls, event):
'''Handle the message, check whether there is room request for'''
room = get_room(event)
cls.create_if_not_exists(room)
try:
if not cls.__requests[room].empty():
cls.__responses[room].put(event, timeout=1)
cls.__requests[room].get()
return True
return False
except queue.Empty:
'''No request, ignore the message'''
return False
@classmethod
def request(cls, room, timeout=30):
'''Request a message, block until message comes in or timeout'''
try:
cls.create_if_not_exists(room)
cls.__requests[room].put_nowait(True)
return cls.__responses[room].get(timeout=timeout)
except queue.Empty:
MessageQueue.clear(room)
raise RequestTimout
@classmethod
def clear(cls, room):
'''Clear the requests'''
cls.create_if_not_exists(room)
try:
cls.__requests[room].get_nowait()
except queue.Empty:
pass
# guess/guess.py
from .message_queue import MessageQueue, RequestTimout
from .line import reply_text, get_room, get_msg
from random import randint
class Guess:
'''Guess handle a guess number game'''
def __init__(self, event):
self.event = event
try:
self.guess()
except RequestTimout:
self.reply('Timeout')
def guess(self):
'''Game function'''
min_value = self.ask_number('From what number?')
max_value = self.ask_number('To what number?')
secret = randint(min_value, max_value)
msg = f'Guess a number between {min_value} to {max_value}'
counter = 0
while True:
counter += 1
answer = self.ask_number(msg)
if answer > secret:
msg = 'Too large'
elif answer < secret:
msg = 'Too small'
else:
break
self.reply(f'You spent {counter} times to guess the secret number.')
def ask(self, *msg):
'''Ask a question to current user'''
self.reply(*msg)
self.event = MessageQueue.request(get_room(self.event))
return get_msg(self.event)
def ask_number(self, *msg):
'''Ask a number, if not number, ask again'''
try:
content = self.ask(*msg)
return int(content)
except ValueError:
return self.ask_number('Please input an integer.', *msg)
def reply(self, *msg):
'''Reply words to user'''
reply_text(self.event, *msg)