From a50ca549996d9e90c3bb7b616675f2cb6b8d6a27 Mon Sep 17 00:00:00 2001 From: Matthew Exon Date: Sat, 8 May 2021 21:27:13 +0200 Subject: [PATCH] Use workerqueue for mailstream jobs instead of custom table --- mailstream/mailstream.php | 248 +++++++++++++++----------------------- 1 file changed, 99 insertions(+), 149 deletions(-) diff --git a/mailstream/mailstream.php b/mailstream/mailstream.php index c22516b3..d877f1eb 100644 --- a/mailstream/mailstream.php +++ b/mailstream/mailstream.php @@ -2,7 +2,7 @@ /** * Name: Mail Stream * Description: Mail all items coming into your network feed to an email address - * Version: 1.1 + * Version: 2.0 * Author: Matthew Exon */ @@ -14,7 +14,9 @@ use Friendica\Database\DBA; use Friendica\DI; use Friendica\Model\Item; use Friendica\Model\Post; +use Friendica\Model\User; use Friendica\Protocol\Activity; +use Friendica\Util\DateTimeFormat; /** * Sets up the addon hooks and the database table @@ -26,43 +28,32 @@ function mailstream_install() Hook::register('post_local_end', 'addon/mailstream/mailstream.php', 'mailstream_post_hook'); Hook::register('post_remote_end', 'addon/mailstream/mailstream.php', 'mailstream_post_hook'); Hook::register('cron', 'addon/mailstream/mailstream.php', 'mailstream_cron'); + Hook::register('mailstream_send_hook', 'addon/mailstream/mailstream.php', 'mailstream_send_hook'); - if (DI::config()->get('mailstream', 'dbversion') == '0.1') { - q('ALTER TABLE `mailstream_item` DROP INDEX `uid`'); - q('ALTER TABLE `mailstream_item` DROP INDEX `contact-id`'); - q('ALTER TABLE `mailstream_item` DROP INDEX `plink`'); - q('ALTER TABLE `mailstream_item` CHANGE `plink` `uri` char(255) NOT NULL'); - DI::config()->set('mailstream', 'dbversion', '0.2'); - } - if (DI::config()->get('mailstream', 'dbversion') == '0.2') { - q('DELETE FROM `pconfig` WHERE `cat` = "mailstream" AND `k` = "delay"'); - DI::config()->set('mailstream', 'dbversion', '0.3'); - } - if (DI::config()->get('mailstream', 'dbversion') == '0.3') { - q('ALTER TABLE `mailstream_item` CHANGE `created` `created` timestamp NOT NULL DEFAULT now()'); - q('ALTER TABLE `mailstream_item` CHANGE `completed` `completed` timestamp NULL DEFAULT NULL'); - DI::config()->set('mailstream', 'dbversion', '0.4'); - } - if (DI::config()->get('mailstream', 'dbversion') == '0.4') { - q('ALTER TABLE `mailstream_item` CONVERT TO CHARACTER SET utf8 COLLATE utf8_bin'); - DI::config()->set('mailstream', 'dbversion', '0.5'); - } - if (DI::config()->get('mailstream', 'dbversion') == '0.5') { - DI::config()->set('mailstream', 'dbversion', '1.0'); - } + Logger::info("mailstream: installed"); +} - if (DI::config()->get('retriever', 'dbversion') != '1.0') { - $schema = file_get_contents(dirname(__file__).'/database.sql'); - $arr = explode(';', $schema); - foreach ($arr as $a) { - $r = q($a); - } - DI::config()->set('mailstream', 'dbversion', '1.0'); +/** + * Enforces that mailstream_install has set up the current version + */ +function mailstream_check_version() +{ + if (!is_null(DI::config()->get('mailstream', 'dbversion'))) { + DI::config()->delete('mailstream', 'dbversion'); + Logger::info("mailstream_check_version: old version detected, reinstalling"); + mailstream_install(); + Hook::loadHooks(); + Hook::add( + 'mailstream_convert_table_entries', + 'addon/mailstream/mailstream.php', + 'mailstream_convert_table_entries' + ); + Hook::fork(PRIORITY_LOW, 'mailstream_convert_table_entries'); } } /** - * This funciton indicates a module that can be wrapped in the LegacyModule class + * This function indicates a module that can be wrapped in the LegacyModule class */ function mailstream_module() { @@ -114,16 +105,40 @@ function mailstream_generate_id($uri) return $message_id; } +function mailstream_send_hook(&$a, $data) +{ + $criteria = array('uid' => $data['uid'], 'contact-id' => $data['contact-id'], 'uri' => $data['uri']); + $item = Post::selectFirst([], $criteria); + if (empty($item)) { + Logger::error('mailstream_send_hook could not find item'); + return; + } + + $user = User::getById($item['uid']); + if (empty($user)) { + Logger::error('mailstream_send_hook could not fund user', ['uid' => $item['uid']]); + return; + } + + if (!mailstream_send($data['message_id'], $item, $user)) { + $delayed = date(DateTimeFormat::utc('now + 1 hour')); + $data['tries'] += 1; + Hook::fork(['priority' => PRIORITY_LOW, 'delayed' => $delayed], 'mailstream_send_hook', $data); + } +} + /** - * Called when either a local or remote post is created. Creates a - * record in the mailstream_item table to track this email, and then - * immediately attempts to send it + * Called when either a local or remote post is created. If + * mailstream is enabled and the necessary data is available, forks a + * workerqueue item to send the email. * * @param Friendica\App $a App object (unused) * @param array $item content of the item (may or may not already be stored in the item table) */ function mailstream_post_hook(&$a, &$item) { + mailstream_check_version(); + if (!DI::pConfig()->get($item['uid'], 'mailstream', 'enabled')) { Logger::debug('mailstream: not enabled for item ' . $item['id']); return; @@ -152,50 +167,13 @@ function mailstream_post_hook(&$a, &$item) } $message_id = mailstream_generate_id($item['uri']); - q( - "INSERT INTO `mailstream_item` (`uid`, `contact-id`, `uri`, `message-id`) " . - "VALUES (%d, '%s', '%s', '%s')", - intval($item['uid']), - intval($item['contact-id']), - DBA::escape($item['uri']), - DBA::escape($message_id) - ); - $r = q( - 'SELECT * FROM `mailstream_item` WHERE `uid` = %d AND `contact-id` = %d AND `uri` = "%s"', - intval($item['uid']), - intval($item['contact-id']), - DBA::escape($item['uri']) - ); - if (count($r) != 1) { - Logger::info('mailstream_post_remote_hook: Unexpected number of items returned from mailstream_item'); - return; - } - $ms_item = $r[0]; - Logger::debug('mailstream_post_remote_hook: created mailstream_item ' . $ms_item['id'] . - ' for item ' . $item['uri'] . ' ' . $item['uid'] . ' ' . $item['contact-id']); - $user = mailstream_get_user($item['uid']); - if (!$user) { - Logger::info('mailstream_post_remote_hook: no user ' . $item['uid']); - return; - } - mailstream_send($ms_item['message-id'], $item, $user); -} -/** - * Converts a user ID into a full user record from the corresponding database table - * - * @param int $uid ID of the user to query - * - * @return array results from the user table - */ -function mailstream_get_user($uid) -{ - $r = q('SELECT * FROM `user` WHERE `uid` = %d', intval($uid)); - if (count($r) != 1) { - Logger::info('mailstream_post_remote_hook: Unexpected number of users returned'); - return; - } - return $r[0]; + $send_hook_data = array('uid' => $item['uid'], + 'contact-id' => $item['contact-id'], + 'uri' => $item['uri'], + 'message_id' => $message_id, + 'tries' => 0); + Hook::fork(PRIORITY_LOW, 'mailstream_send_hook', $send_hook_data); } /** @@ -326,10 +304,13 @@ function mailstream_subject($item) intval($item['uid']) ); if (!DBA::isResult($r)) { - Logger::error( - 'mailstream_subject no contact for item', - ['item id' => $item['id'], 'plink' => $item['plink'], 'contact id' => $item['contact-id'], 'uid' => $item['uid']] - ); + Logger::error( + 'mailstream_subject no contact for item', + ['id' => $item['id'], + 'plink' => $item['plink'], + 'contact id' => $item['contact-id'], + 'uid' => $item['uid']] + ); return DI::l10n()->t("Friendica post"); } $contact = $r[0]; @@ -361,14 +342,24 @@ function mailstream_subject($item) * @param string $message_id ID of the message (RFC 1036) * @param array $item content of the item * @param array $user results from the user table + * + * @return bool True if this message has been completed. False if it should be retried. */ function mailstream_send($message_id, $item, $user) { - if (!$item['visible']) { + if (!is_array($item)) { + Logger::error('mailstream_send item is empty', ['message_id' => $message_id]); return; } + + if (!$item['visible']) { + Logger::debug('mailstream_send item not yet visible', ['item uri' => $item['uri']]); + return false; + } if (!$message_id) { - return; + Logger::error('mailstream_send no message ID supplied', ['item uri' => $item['uri'], + 'user email' => $user['email']]); + return true; } require_once(dirname(__file__).'/phpmailer/class.phpmailer.php'); @@ -418,16 +409,16 @@ function mailstream_send($message_id, $item, $user) if (!$mail->Send()) { throw new Exception($mail->ErrorInfo); } - Logger::debug('mailstream_send sent message ' . $mail->MessageID . ' ' . $mail->Subject); + Logger::debug('mailstream_send sent message', ['message ID' => $mail->MessageID, + 'subject' => $mail->Subject, + 'address' => $address]); } catch (phpmailerException $e) { Logger::debug('mailstream_send PHPMailer exception sending message ' . $message_id . ': ' . $e->errorMessage()); } catch (Exception $e) { Logger::debug('mailstream_send exception sending message ' . $message_id . ': ' . $e->getMessage()); } - // In case of failure, still set the item to completed. Otherwise - // we'll just try to send it over and over again and it'll fail - // every time. - q('UPDATE `mailstream_item` SET `completed` = now() WHERE `message-id` = "%s"', DBA::escape($message_id)); + + return true; } /** @@ -447,60 +438,41 @@ function mailstream_html_wrap(&$text) } /** - * Cron job for the mailstream plugin. Sends delayed messages and cleans up old successful entries from the table. + * Convert v1 mailstream table entries to v2 workerqueue items */ -function mailstream_cron() +function mailstream_convert_table_entries() { - // Only process items older than an hour in cron. This is because - // we want to give mailstream_post_remote_hook a fair chance to - // send the email itself before cron jumps in. Only if - // mailstream_post_remote_hook fails for some reason will this get - // used, and in that case it's worth holding off a bit anyway. $query = <<< EOT SELECT - `mailstream_item`.`message-id`, - `mailstream_item`.`uri`, - `post-user-view`.`id` + `message-id`, + `uri`, + `uid`, + `contact-id` FROM `mailstream_item` - JOIN - `post-user-view` - ON ( - `mailstream_item`.`uid` = `post-user-view`.`uid` AND - `mailstream_item`.`uri` = `post-user-view`.`uri` AND - `mailstream_item`.`contact-id` = `post-user-view`.`contact-id` - ) WHERE - `mailstream_item`.`completed` IS NULL AND - `mailstream_item`.`created` < DATE_SUB(NOW(), INTERVAL 1 HOUR) AND - `post-user-view`.`visible` = 1 -ORDER BY `mailstream_item`.`created` -LIMIT 100 + `mailstream_item`.`completed` IS NULL EOT; $ms_item_ids = q($query); if (DBA::isResult($ms_item_ids)) { - Logger::debug('mailstream_cron processing ' . count($ms_item_ids) . ' items'); + Logger::debug('mailstream_convert_table_entries processing ' . count($ms_item_ids) . ' items'); foreach ($ms_item_ids as $ms_item_id) { + $send_hook_data = array('uid' => $ms_item_id['uid'], + 'contact-id' => $ms_item_id['contact-id'], + 'uri' => $ms_item_id['uri'], + 'message_id' => $ms_item_id['message-id'], + 'tries' => 0); if (!$ms_item_id['message-id'] || !strlen($ms_item_id['message-id'])) { - Logger::info('mailstream_cron: Item ' . $ms_item_id['id'] . - ' URI ' . $ms_item_id['uri'] . ' has no message-id'); - } - $item = Post::selectFirst([], ['id' => $ms_item_id['id']]); - $users = q("SELECT * FROM `user` WHERE `uid` = %d", intval($item['uid'])); - $user = $users[0]; - if ($user && $item) { - mailstream_send($ms_item_id['message-id'], $item, $user); - } else { - Logger::info('mailstream_cron: Unable to find item ' . $ms_item_id['id']); - q( - "UPDATE `mailstream_item` SET `completed` = now() WHERE `message-id` = %d", - intval($ms_item_id['message-id']) - ); + Logger::info('mailstream_cron: Item ' . + $ms_item_id['id'] . ' URI ' . $ms_item_id['uri'] . ' has no message-id'); + continue; } + Logger::info('mailstream_convert_table_entries: convert item to workerqueue', $send_hook_data); + Hook::fork(PRIORITY_LOW, 'mailstream_send_hook', $send_hook_data); } } - mailstream_tidy(); + q('DROP TABLE `mailstream_item`'); } /** @@ -567,25 +539,3 @@ function mailstream_addon_settings_post() DI::pConfig()->delete(local_user(), 'mailstream', 'attachimg'); } } - -/** - * Deletes records from the mailstream_item table older than one year - */ -function mailstream_tidy() -{ - $query = <<< EOT -SELECT - id -FROM - mailstream_item -WHERE - completed IS NOT NULL AND - completed < DATE_SUB(NOW(), INTERVAL 1 YEAR) - -EOT; - $r = q($query); - foreach ($r as $rr) { - q('DELETE FROM mailstream_item WHERE id = %d', intval($rr['id'])); - } - Logger::debug('mailstream_tidy: deleted ' . count($r) . ' old items'); -}